make read_exact_at cancel-safe

Pass `FileGuard` to the tokio-epoll-uring read() operatio.
and avoids the nasty scopeguard usage.

This makes the `read_exact_at` cancel-safe.

Depends on https://github.com/neondatabase/tokio-epoll-uring/pull/27
This commit is contained in:
Christian Schwarz
2023-12-11 15:43:31 +00:00
parent 0ca94f5b53
commit e4ba18d27b
3 changed files with 41 additions and 29 deletions

4
Cargo.lock generated
View File

@@ -5158,7 +5158,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=problame/generic-owned-fd#c57c5867ae9511f2f2d9ba170f493bbab73a2eee"
dependencies = [
"futures",
"once_cell",
@@ -5714,7 +5714,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=problame/generic-owned-fd#c57c5867ae9511f2f2d9ba170f493bbab73a2eee"
dependencies = [
"io-uring",
"libc",

View File

@@ -85,7 +85,7 @@ strum.workspace = true
strum_macros.workspace = true
# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/25
#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "problame/generic-owned-fd" }
[dev-dependencies]
criterion.workspace = true

View File

@@ -16,9 +16,10 @@ use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
use std::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use utils::fs_ext;
@@ -105,7 +106,7 @@ struct SlotInner {
tag: u64,
/// the underlying file
file: Option<File>,
file: Option<OwnedFd>,
}
impl OpenFiles {
@@ -270,7 +271,7 @@ macro_rules! observe_duration {
macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
let mut $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
@@ -424,7 +425,6 @@ impl VirtualFile {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
file
}));
@@ -452,15 +452,13 @@ impl VirtualFile {
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
.with_std_file(|std_file| std_file.sync_all()))
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
.with_std_file(|std_file| std_file.metadata()))
}
/// Helper function internal to `VirtualFile` that looks up the underlying File,
@@ -526,7 +524,6 @@ impl VirtualFile {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
file
});
@@ -553,9 +550,8 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
self.pos = with_file!(self, StorageIoOperation::Seek, |file_guard| file_guard
.with_std_file(|std_file| std_file.seek(SeekFrom::End(offset))))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -620,13 +616,7 @@ impl VirtualFile {
buf: write_guard,
init_up_to: 0,
};
let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) };
let guard = scopeguard::guard(file_guard, |_| {
panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)")
});
let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await;
let _ = OwnedFd::into_raw_fd(owned_fd);
let _ = scopeguard::ScopeGuard::into_inner(guard);
let ((_, buf), res) = system.read(file_guard, offset, buf).await;
let PageWriteGuardBuf {
buf: write_guard,
init_up_to,
@@ -686,9 +676,9 @@ impl VirtualFile {
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
});
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
@@ -702,14 +692,36 @@ struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
}
impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
impl<'a> AsRef<OwnedFd> for FileGuard<'a> {
fn as_ref(&self) -> &OwnedFd {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
impl<'a> FileGuard<'a> {
// TODO: switch to tokio-epoll-uring native operations.
#[deprecated]
fn with_std_file<F, R>(&mut self, with: F) -> R
where
F: FnOnce(&mut File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
with(&mut file)
}
}
impl tokio_epoll_uring::IoFd for FileGuard<'static> {
unsafe fn as_fd(&self) -> RawFd {
let owned_fd: &OwnedFd = self.as_ref();
owned_fd.as_raw_fd()
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(