From 5e5eb2998f5b861a7ce9c10e4ca15a4f3907e2fe Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 29 Aug 2023 12:24:30 +0000 Subject: [PATCH] use `tokio_epoll_uring` for read path & VirtualFile::open This makes Delta/Image ::load fns fully tokio-epoll-uring (cherry picked from commit f51a08eaa107e19b3bb5b7e67cdae435282c2245) --- Cargo.lock | 52 ++++- pageserver/Cargo.toml | 3 + pageserver/src/tenant/block_io.rs | 14 +- pageserver/src/tenant/ephemeral_file.rs | 20 +- .../src/tenant/storage_layer/delta_layer.rs | 13 +- .../src/tenant/storage_layer/image_layer.rs | 23 ++- pageserver/src/virtual_file.rs | 194 +++++++++++++----- 7 files changed, 231 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d605169986..f94f089bb6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2286,6 +2286,16 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "io-uring" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "ipnet" version = "2.7.2" @@ -2982,6 +2992,7 @@ dependencies = [ "tenant_size_model", "thiserror", "tokio", + "tokio-epoll-uring", "tokio-io-timeout", "tokio-postgres", "tokio-tar", @@ -4789,18 +4800,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", @@ -4888,22 +4899,38 @@ dependencies = [ [[package]] name = "tokio" -version = "1.28.1" +version = "1.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ - "autocfg", + "backtrace", "bytes", "libc", "mio", "num_cpus", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.4.9", + "socket2 0.5.3", "tokio-macros", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-epoll-uring" +version = "0.1.0" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39" +dependencies = [ + "futures", + "once_cell", + "scopeguard", + "thiserror", + "tokio", + "tokio-util", + "tracing", + "uring-common", +] + [[package]] name = "tokio-io-timeout" version = "1.2.0" @@ -5436,6 +5463,15 @@ dependencies = [ "webpki-roots 0.23.1", ] +[[package]] +name = "uring-common" +version = "0.1.0" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39" +dependencies = [ + "io-uring", + "libc", +] + [[package]] name = "url" version = "2.3.1" diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 3eb01003df..c5b55c4b9d 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -82,6 +82,9 @@ enum-map.workspace = true enumset.workspace = true strum.workspace = true strum_macros.workspace = true +# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/25 +#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" } +tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" } [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 0617017528..38305e9f4e 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -5,10 +5,10 @@ use super::ephemeral_file::EphemeralFile; use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner}; use crate::context::RequestContext; -use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ}; +use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ}; use crate::virtual_file::VirtualFile; use bytes::Bytes; -use std::ops::{Deref, DerefMut}; +use std::ops::Deref; /// This is implemented by anything that can read 8 kB (PAGE_SZ) /// blocks, using the page cache @@ -169,7 +169,11 @@ impl FileBlockReader { } /// Read a page from the underlying file into given buffer. - async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { + async fn fill_buffer( + &self, + buf: PageWriteGuard<'static>, + blkno: u32, + ) -> Result, std::io::Error> { assert!(buf.len() == PAGE_SZ); self.file .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) @@ -196,9 +200,9 @@ impl FileBlockReader { ) })? { ReadBufResult::Found(guard) => Ok(guard.into()), - ReadBufResult::NotFound(mut write_guard) => { + ReadBufResult::NotFound(write_guard) => { // Read the page from disk into the buffer - self.fill_buffer(write_guard.deref_mut(), blknum).await?; + let write_guard = self.fill_buffer(write_guard, blknum).await?; Ok(write_guard.mark_valid().into()) } } diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 9a06d9df61..35b3a0c701 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -8,7 +8,6 @@ use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader}; use crate::virtual_file::VirtualFile; use camino::Utf8PathBuf; use std::cmp::min; -use std::fs::OpenOptions; use std::io::{self, ErrorKind}; use std::ops::DerefMut; use std::sync::atomic::AtomicU64; @@ -44,11 +43,11 @@ impl EphemeralFile { "ephemeral-{filename_disambiguator}" ))); - let file = VirtualFile::open_with_options( - &filename, - OpenOptions::new().read(true).write(true).create(true), - ) - .await?; + let file = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true).write(true).create(true); + VirtualFile::open_with_options_async(&filename, options).await? + }; Ok(EphemeralFile { page_cache_file_id: page_cache::next_file_id(), @@ -88,11 +87,10 @@ impl EphemeralFile { page_cache::ReadBufResult::Found(guard) => { return Ok(BlockLease::PageReadGuard(guard)) } - page_cache::ReadBufResult::NotFound(mut write_guard) => { - let buf: &mut [u8] = write_guard.deref_mut(); - debug_assert_eq!(buf.len(), PAGE_SZ); - self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + page_cache::ReadBufResult::NotFound(write_guard) => { + let write_guard = self + .file + .read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64) .await?; let read_guard = write_guard.mark_valid(); return Ok(BlockLease::PageReadGuard(read_guard)); diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index dc5e9e33af..4d13ade5ac 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -634,12 +634,13 @@ impl DeltaLayer { where F: Fn(Summary) -> Summary, { - let file = VirtualFile::open_with_options( - path, - &*std::fs::OpenOptions::new().read(true).write(true), - ) - .await - .with_context(|| format!("Failed to open file '{}'", path))?; + let file = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true).write(true); + VirtualFile::open_with_options_async(path, options) + .await + .with_context(|| format!("Failed to open file '{}'", path))? + }; let file = FileBlockReader::new(file); let summary_blk = file.read_blk(0, ctx).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index bbe45882a9..2394c0a25d 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -319,12 +319,13 @@ impl ImageLayer { where F: Fn(Summary) -> Summary, { - let file = VirtualFile::open_with_options( - path, - &*std::fs::OpenOptions::new().read(true).write(true), - ) - .await - .with_context(|| format!("Failed to open file '{}'", path))?; + let file = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true).write(true); + VirtualFile::open_with_options_async(path, options) + .await + .with_context(|| format!("Failed to open file '{}'", path))? + }; let file = FileBlockReader::new(file); let summary_blk = file.read_blk(0, ctx).await?; let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?; @@ -486,11 +487,11 @@ impl ImageLayerWriterInner { }, ); info!("new image layer {path}"); - let mut file = VirtualFile::open_with_options( - &path, - std::fs::OpenOptions::new().write(true).create_new(true), - ) - .await?; + let mut file = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.write(true).create_new(true); + VirtualFile::open_with_options_async(&path, options).await? + }; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 0b736a7f7e..fad0430650 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -11,11 +11,12 @@ //! src/backend/storage/file/fd.c //! use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC}; -use crate::tenant::TENANTS_SEGMENT_NAME; +use crate::page_cache::PageWriteGuard; use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; -use std::fs::{self, File, OpenOptions}; +use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -54,7 +55,7 @@ pub struct VirtualFile { /// opened, in the VirtualFile::create() function, and strip the flag before /// storing it here. pub path: Utf8PathBuf, - open_options: OpenOptions, + open_options: tokio_epoll_uring::ops::open_at::OpenOptions, // These are strings becase we only use them for metrics, and those expect strings. // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into @@ -277,17 +278,17 @@ macro_rules! with_file { impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Utf8Path) -> Result { - Self::open_with_options(path, OpenOptions::new().read(true)).await + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true); + Self::open_with_options_async(path, options).await } /// Create a new file for writing. If the file exists, it will be truncated. /// Like File::create. pub async fn create(path: &Utf8Path) -> Result { - Self::open_with_options( - path, - OpenOptions::new().write(true).create(true).truncate(true), - ) - .await + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.write(true).create(true).truncate(true); + Self::open_with_options_async(path, options).await } /// Open a file with given options. @@ -295,6 +296,7 @@ impl VirtualFile { /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, /// they will be applied also when the file is subsequently re-opened, not only /// on the first time. Make sure that's sane! + #[cfg(test)] pub async fn open_with_options( path: &Utf8Path, open_options: &OpenOptions, @@ -355,15 +357,15 @@ impl VirtualFile { )); }; std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?; - let mut file = Self::open_with_options( - tmp_path, - OpenOptions::new() + let mut file = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options .write(true) // Use `create_new` so that, if we race with ourselves or something else, // we bail out instead of causing damage. - .create_new(true), - ) - .await?; + .create_new(true); + Self::open_with_options_async(tmp_path, options).await? + }; file.write_all(content).await?; file.sync_all().await?; drop(file); // before the rename, that's important! @@ -374,12 +376,74 @@ impl VirtualFile { // the current `find_victim_slot` impl might pick the same slot for both // VirtualFile., and it eventually does a blocking write lock instead of // try_lock. - let final_parent_dirfd = - Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?; + let final_parent_dirfd = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true); + Self::open_with_options_async(final_path_parent, options).await? + }; final_parent_dirfd.sync_all().await?; Ok(()) } + /// Open a file with given options. + /// + /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, + /// they will be applied also when the file is subsequently re-opened, not only + /// on the first time. Make sure that's sane! + pub async fn open_with_options_async( + path: &Utf8Path, + open_options: tokio_epoll_uring::ops::open_at::OpenOptions, + ) -> Result { + let path_str = path.to_string(); + let parts = path_str.split('/').collect::>(); + let tenant_id; + let timeline_id; + if parts.len() > 5 && parts[parts.len() - 5] == "tenants" { + tenant_id = parts[parts.len() - 4].to_string(); + timeline_id = parts[parts.len() - 2].to_string(); + } else { + tenant_id = "*".to_string(); + timeline_id = "*".to_string(); + } + let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; + + slot_guard.file = Some({ + let system = tokio_epoll_uring::thread_local_system().await; + let file: OwnedFd = system + .open(path, &open_options) + .await + .map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + })?; + let file = File::from(file); + file + }); + + // Strip all options other than read and write. + // + // It would perhaps be nicer to check just for the read and write flags + // explicitly, but OpenOptions doesn't contain any functions to read flags, + // only to set them. + let mut reopen_options = open_options; + reopen_options.create(false); + reopen_options.create_new(false); + reopen_options.truncate(false); + + let vfile = VirtualFile { + handle: RwLock::new(handle), + pos: 0, + path: path.to_path_buf(), + open_options: reopen_options, + tenant_id, + timeline_id, + }; + + Ok(vfile) + } + /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { with_file!(self, StorageIoOperation::Fsync, |file| file @@ -399,7 +463,7 @@ impl VirtualFile { /// /// We are doing it via a macro as Rust doesn't support async closures that /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + async fn lock_file(&self) -> Result, Error> { let open_files = get_open_files(); let mut handle_guard = { @@ -442,7 +506,20 @@ impl VirtualFile { let (handle, mut slot_guard) = open_files.find_victim_slot().await; // Open the physical file - let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?; + let file = { + let system = tokio_epoll_uring::thread_local_system().await; + let file: OwnedFd = system + .open(&self.path, &self.open_options) + .await + .map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + })?; + let file = File::from(file); + file + }; // Store the File in the slot and update the handle in the VirtualFile // to point to it. @@ -489,24 +566,59 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { - while !buf.is_empty() { - match self.read_at(buf, offset).await { - Ok(0) => { - return Err(Error::new( - std::io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )) - } - Ok(n) => { - buf = &mut buf[n..]; - offset += n as u64; - } - Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), + pub async fn read_exact_at( + &self, + write_guard: PageWriteGuard<'static>, + offset: u64, + ) -> Result, Error> { + let file_guard: FileGuard<'static> = self.lock_file().await?; + + let system = tokio_epoll_uring::thread_local_system().await; + struct PageWriteGuardBuf { + buf: PageWriteGuard<'static>, + init_up_to: usize, + } + unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf { + fn stable_ptr(&self) -> *const u8 { + self.buf.as_ptr() + } + fn bytes_init(&self) -> usize { + self.init_up_to + } + fn bytes_total(&self) -> usize { + self.buf.len() } } - Ok(()) + unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf { + fn stable_mut_ptr(&mut self) -> *mut u8 { + self.buf.as_mut_ptr() + } + + unsafe fn set_init(&mut self, pos: usize) { + assert!(pos <= self.buf.len()); + self.init_up_to = pos; + } + } + let buf = PageWriteGuardBuf { + buf: write_guard, + init_up_to: 0, + }; + let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; + let guard = scopeguard::guard(file_guard, |_| { + panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)") + }); + let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; + let _ = OwnedFd::into_raw_fd(owned_fd); + let _ = scopeguard::ScopeGuard::into_inner(guard); + let PageWriteGuardBuf { + buf: write_guard, + init_up_to, + } = buf; + if let Ok(num_read) = res { + assert!(init_up_to == num_read); // TODO need to deal with short reads here + } + res.map(|_| write_guard) + .map_err(|e| Error::new(ErrorKind::Other, e)) } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235 @@ -556,18 +668,6 @@ impl VirtualFile { Ok(n) } - pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Read, |file| file - .as_ref() - .read_at(buf, offset)); - if let Ok(size) = result { - STORAGE_IO_SIZE - .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) - .add(size as i64); - } - result - } - async fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = with_file!(self, StorageIoOperation::Write, |file| file .as_ref()