From c9c2b26dfeeba2e9dd6909ea94cf457c179a7e37 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 12 Jan 2024 10:42:44 +0000 Subject: [PATCH] feature(VirtualFile): implement tokio-epoll-uring IO engine --- pageserver/src/lib.rs | 1 + pageserver/src/virtual_file.rs | 9 +++++- pageserver/src/virtual_file/io_engine.rs | 14 +++++++++ pageserver/src/virtual_file/open_options.rs | 34 +++++++++++++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 26070e0cc1..bcde1166b7 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -1,3 +1,4 @@ +#![recursion_limit = "300"] #![deny(clippy::undocumented_unsafe_blocks)] mod auth; diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index f2fe617b10..4a0b34fbef 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -20,7 +20,7 @@ use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; use tokio_epoll_uring::IoBufMut; -use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -709,6 +709,13 @@ impl FileGuard { } } +impl tokio_epoll_uring::IoFd for FileGuard { + unsafe fn as_fd(&self) -> RawFd { + let owned_fd: &OwnedFd = self.as_ref(); + owned_fd.as_raw_fd() + } +} + #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk( diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 91c94ce969..8619cbf661 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -22,6 +22,7 @@ #[strum(serialize_all = "kebab-case")] pub enum IoEngineKind { StdFs, + TokioEpollUring, } static IO_ENGINE: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); @@ -90,6 +91,19 @@ impl IoEngineKind { drop(dst); ((file_guard, buf), res) } + IoEngineKind::TokioEpollUring => { + let system = tokio_epoll_uring::thread_local_system().await; + let (resources, res) = system.read(file_guard, offset, buf).await; + ( + resources, + res.map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + }), + ) + } } } } diff --git a/pageserver/src/virtual_file/open_options.rs b/pageserver/src/virtual_file/open_options.rs index 4c2148602a..eb925327b1 100644 --- a/pageserver/src/virtual_file/open_options.rs +++ b/pageserver/src/virtual_file/open_options.rs @@ -6,12 +6,16 @@ use std::{os::fd::OwnedFd, path::Path}; #[derive(Debug, Clone)] pub enum OpenOptions { StdFs(std::fs::OpenOptions), + TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions), } impl Default for OpenOptions { fn default() -> Self { match super::io_engine::get() { IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()), + IoEngineKind::TokioEpollUring => { + Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new()) + } } } } @@ -26,6 +30,9 @@ impl OpenOptions { OpenOptions::StdFs(x) => { let _ = x.read(read); } + OpenOptions::TokioEpollUring(x) => { + let _ = x.read(read); + } } self } @@ -35,6 +42,9 @@ impl OpenOptions { OpenOptions::StdFs(x) => { let _ = x.write(write); } + OpenOptions::TokioEpollUring(x) => { + let _ = x.write(write); + } } self } @@ -44,6 +54,9 @@ impl OpenOptions { OpenOptions::StdFs(x) => { let _ = x.create(create); } + OpenOptions::TokioEpollUring(x) => { + let _ = x.create(create); + } } self } @@ -53,6 +66,9 @@ impl OpenOptions { OpenOptions::StdFs(x) => { let _ = x.create_new(create_new); } + OpenOptions::TokioEpollUring(x) => { + let _ = x.create_new(create_new); + } } self } @@ -62,6 +78,9 @@ impl OpenOptions { OpenOptions::StdFs(x) => { let _ = x.truncate(truncate); } + OpenOptions::TokioEpollUring(x) => { + let _ = x.truncate(truncate); + } } self } @@ -69,6 +88,15 @@ impl OpenOptions { pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result { match self { OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()), + OpenOptions::TokioEpollUring(x) => { + let system = tokio_epoll_uring::thread_local_system().await; + system.open(path, x).await.map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + }) + } } } } @@ -79,6 +107,9 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions { OpenOptions::StdFs(x) => { let _ = x.mode(mode); } + OpenOptions::TokioEpollUring(x) => { + let _ = x.mode(mode); + } } self } @@ -88,6 +119,9 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions { OpenOptions::StdFs(x) => { let _ = x.custom_flags(flags); } + OpenOptions::TokioEpollUring(x) => { + let _ = x.custom_flags(flags); + } } self }