CP: use hacked-together open_at for async VirtualFile open calls instead of spawn_blocking

This makes Delta/Image ::load fns fully tokio-epoll-uring
This commit is contained in:
Christian Schwarz
2023-08-29 19:13:38 +00:00
parent 8d3e8078f7
commit 61fac1ab0b
5 changed files with 113 additions and 32 deletions

6
Cargo.lock generated
View File

@@ -2078,9 +2078,9 @@ dependencies = [
[[package]]
name = "io-uring"
version = "0.6.1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "141a0f4546a50b2ed637c7a6df0d7dff45c9f41523254996764461c8ae0d9424"
checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762"
dependencies = [
"bitflags",
"libc",
@@ -4596,7 +4596,7 @@ name = "tokio-epoll-uring"
version = "0.1.0"
dependencies = [
"futures",
"io-uring 0.6.1",
"io-uring 0.6.2",
"libc",
"once_cell",
"scopeguard",

View File

@@ -84,6 +84,7 @@ enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" }
#tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "problame/hacky-openat" }
[dev-dependencies]
criterion.workspace = true

View File

@@ -44,11 +44,11 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)
.await?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true).create(true);
VirtualFile::open_with_options_async(&filename, options).await?
};
Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
@@ -89,7 +89,8 @@ impl EphemeralFile {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let write_guard = self.file
let write_guard = self
.file
.read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();

View File

@@ -559,11 +559,11 @@ impl ImageLayerWriterInner {
},
);
info!("new image layer {path}");
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)
.await?;
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create_new(true);
VirtualFile::open_with_options_async(&path, options).await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);

View File

@@ -55,7 +55,7 @@ pub struct VirtualFile {
/// opened, in the VirtualFile::create() function, and strip the flag before
/// storing it here.
pub path: Utf8PathBuf,
open_options: OpenOptions,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
// These are strings becase we only use them for metrics, and those expect strings.
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
@@ -237,17 +237,17 @@ macro_rules! with_file {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
Self::open_with_options_async(path, options).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create(true).truncate(true);
Self::open_with_options_async(path, options).await
}
/// Open a file with given options.
@@ -255,6 +255,7 @@ impl VirtualFile {
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
#[cfg(test)]
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
@@ -317,16 +318,17 @@ impl VirtualFile {
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
}
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
.create_new(true);
Self::open_with_options_async(tmp_path, options)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?
};
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
@@ -342,10 +344,13 @@ impl VirtualFile {
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
let final_parent_dirfd = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
Self::open_with_options_async(final_path_parent, options)
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?
};
final_parent_dirfd
.sync_all()
.await
@@ -353,6 +358,66 @@ impl VirtualFile {
Ok(())
}
/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub async fn open_with_options_async(
path: &Utf8Path,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
tenant_id = parts[parts.len() - 4].to_string();
timeline_id = parts[parts.len() - 2].to_string();
} else {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let file = {
let start = std::time::Instant::now();
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd = system
.open(path, &open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
file
};
// Strip all options other than read and write.
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options;
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
handle: RwLock::new(handle),
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenant_id,
timeline_id,
};
Ok(vfile)
}
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file| file
@@ -415,7 +480,21 @@ impl VirtualFile {
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
// Open the physical file
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
let file = {
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd =
system
.open(&self.path, &self.open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
file
};
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.