mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-16 09:52:54 +00:00
feature(VirtualFile): implement tokio-epoll-uring IO engine
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
#![recursion_limit = "300"]
|
||||
#![deny(clippy::undocumented_unsafe_blocks)]
|
||||
|
||||
mod auth;
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
@@ -709,6 +709,13 @@ impl FileGuard {
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_epoll_uring::IoFd for FileGuard {
|
||||
unsafe fn as_fd(&self) -> RawFd {
|
||||
let owned_fd: &OwnedFd = self.as_ref();
|
||||
owned_fd.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#[strum(serialize_all = "kebab-case")]
|
||||
pub enum IoEngineKind {
|
||||
StdFs,
|
||||
TokioEpollUring,
|
||||
}
|
||||
|
||||
static IO_ENGINE: once_cell::sync::OnceCell<IoEngineKind> = once_cell::sync::OnceCell::new();
|
||||
@@ -90,6 +91,19 @@ impl IoEngineKind {
|
||||
drop(dst);
|
||||
((file_guard, buf), res)
|
||||
}
|
||||
IoEngineKind::TokioEpollUring => {
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
let (resources, res) = system.read(file_guard, offset, buf).await;
|
||||
(
|
||||
resources,
|
||||
res.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,12 +6,16 @@ use std::{os::fd::OwnedFd, path::Path};
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum OpenOptions {
|
||||
StdFs(std::fs::OpenOptions),
|
||||
TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions),
|
||||
}
|
||||
|
||||
impl Default for OpenOptions {
|
||||
fn default() -> Self {
|
||||
match super::io_engine::get() {
|
||||
IoEngineKind::StdFs => Self::StdFs(std::fs::OpenOptions::new()),
|
||||
IoEngineKind::TokioEpollUring => {
|
||||
Self::TokioEpollUring(tokio_epoll_uring::ops::open_at::OpenOptions::new())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -26,6 +30,9 @@ impl OpenOptions {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.read(read);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.read(read);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -35,6 +42,9 @@ impl OpenOptions {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.write(write);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.write(write);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -44,6 +54,9 @@ impl OpenOptions {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.create(create);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.create(create);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -53,6 +66,9 @@ impl OpenOptions {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.create_new(create_new);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -62,6 +78,9 @@ impl OpenOptions {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.truncate(truncate);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -69,6 +88,15 @@ impl OpenOptions {
|
||||
pub(in crate::virtual_file) async fn open(&self, path: &Path) -> std::io::Result<OwnedFd> {
|
||||
match self {
|
||||
OpenOptions::StdFs(x) => x.open(path).map(|file| file.into()),
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let system = tokio_epoll_uring::thread_local_system().await;
|
||||
system.open(path, x).await.map_err(|e| match e {
|
||||
tokio_epoll_uring::Error::Op(e) => e,
|
||||
tokio_epoll_uring::Error::System(system) => {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, system)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -79,6 +107,9 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.mode(mode);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.mode(mode);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
@@ -88,6 +119,9 @@ impl std::os::unix::prelude::OpenOptionsExt for OpenOptions {
|
||||
OpenOptions::StdFs(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
OpenOptions::TokioEpollUring(x) => {
|
||||
let _ = x.custom_flags(flags);
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user