From 61fac1ab0b437340614e56dfb09d083f6d75edde Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 29 Aug 2023 19:13:38 +0000 Subject: [PATCH] CP: use hacked-together open_at for async VirtualFile open calls instead of spawn_blocking This makes Delta/Image ::load fns fully tokio-epoll-uring --- Cargo.lock | 6 +- pageserver/Cargo.toml | 1 + pageserver/src/tenant/ephemeral_file.rs | 13 +- .../src/tenant/storage_layer/image_layer.rs | 10 +- pageserver/src/virtual_file.rs | 115 +++++++++++++++--- 5 files changed, 113 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9f4f7e45ad..3f954385f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2078,9 +2078,9 @@ dependencies = [ [[package]] name = "io-uring" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "141a0f4546a50b2ed637c7a6df0d7dff45c9f41523254996764461c8ae0d9424" +checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762" dependencies = [ "bitflags", "libc", @@ -4596,7 +4596,7 @@ name = "tokio-epoll-uring" version = "0.1.0" dependencies = [ "futures", - "io-uring 0.6.1", + "io-uring 0.6.2", "libc", "once_cell", "scopeguard", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 34a159660d..3ddc986021 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -84,6 +84,7 @@ enumset.workspace = true strum.workspace = true strum_macros.workspace = true tokio-epoll-uring = { path = "/home/admin/tokio-epoll-uring/tokio-epoll-uring" } +#tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "problame/hacky-openat" } [dev-dependencies] criterion.workspace = true diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 7be2e8be6d..61bffdc575 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -44,11 +44,11 @@ impl EphemeralFile { "ephemeral-{filename_disambiguator}" ))); - let file = VirtualFile::open_with_options( - &filename, - OpenOptions::new().read(true).write(true).create(true), - ) - .await?; + let file = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true).write(true).create(true); + VirtualFile::open_with_options_async(&filename, options).await? + }; Ok(EphemeralFile { page_cache_file_id: page_cache::next_file_id(), @@ -89,7 +89,8 @@ impl EphemeralFile { return Ok(BlockLease::PageReadGuard(guard)) } page_cache::ReadBufResult::NotFound(mut write_guard) => { - let write_guard = self.file + let write_guard = self + .file .read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64) .await?; let read_guard = write_guard.mark_valid(); diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index e8e26ebba0..cfc45cd2e1 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -559,11 +559,11 @@ impl ImageLayerWriterInner { }, ); info!("new image layer {path}"); - let mut file = VirtualFile::open_with_options( - &path, - std::fs::OpenOptions::new().write(true).create_new(true), - ) - .await?; + let mut file = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.write(true).create_new(true); + VirtualFile::open_with_options_async(&path, options).await? + }; // make room for the header block file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?; let blob_writer = BlobWriter::new(file, PAGE_SZ as u64); diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index c94f3801b6..c76fd7dd15 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -55,7 +55,7 @@ pub struct VirtualFile { /// opened, in the VirtualFile::create() function, and strip the flag before /// storing it here. pub path: Utf8PathBuf, - open_options: OpenOptions, + open_options: tokio_epoll_uring::ops::open_at::OpenOptions, // These are strings becase we only use them for metrics, and those expect strings. // It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into @@ -237,17 +237,17 @@ macro_rules! with_file { impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Utf8Path) -> Result { - Self::open_with_options(path, OpenOptions::new().read(true)).await + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true); + Self::open_with_options_async(path, options).await } /// Create a new file for writing. If the file exists, it will be truncated. /// Like File::create. pub async fn create(path: &Utf8Path) -> Result { - Self::open_with_options( - path, - OpenOptions::new().write(true).create(true).truncate(true), - ) - .await + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.write(true).create(true).truncate(true); + Self::open_with_options_async(path, options).await } /// Open a file with given options. @@ -255,6 +255,7 @@ impl VirtualFile { /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, /// they will be applied also when the file is subsequently re-opened, not only /// on the first time. Make sure that's sane! + #[cfg(test)] pub async fn open_with_options( path: &Utf8Path, open_options: &OpenOptions, @@ -317,16 +318,17 @@ impl VirtualFile { Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)), } - let mut file = Self::open_with_options( - tmp_path, - OpenOptions::new() + let mut file = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options .write(true) // Use `create_new` so that, if we race with ourselves or something else, // we bail out instead of causing damage. - .create_new(true), - ) - .await - .map_err(CrashsafeOverwriteError::CreateTempfile)?; + .create_new(true); + Self::open_with_options_async(tmp_path, options) + .await + .map_err(CrashsafeOverwriteError::CreateTempfile)? + }; file.write_all(content) .await .map_err(CrashsafeOverwriteError::WriteContents)?; @@ -342,10 +344,13 @@ impl VirtualFile { // the current `find_victim_slot` impl might pick the same slot for both // VirtualFile., and it eventually does a blocking write lock instead of // try_lock. - let final_parent_dirfd = - Self::open_with_options(final_path_parent, OpenOptions::new().read(true)) + let final_parent_dirfd = { + let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new(); + options.read(true); + Self::open_with_options_async(final_path_parent, options) .await - .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?; + .map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)? + }; final_parent_dirfd .sync_all() .await @@ -353,6 +358,66 @@ impl VirtualFile { Ok(()) } + /// Open a file with given options. + /// + /// Note: If any custom flags were set in 'open_options' through OpenOptionsExt, + /// they will be applied also when the file is subsequently re-opened, not only + /// on the first time. Make sure that's sane! + pub async fn open_with_options_async( + path: &Utf8Path, + open_options: tokio_epoll_uring::ops::open_at::OpenOptions, + ) -> Result { + let path_str = path.to_string(); + let parts = path_str.split('/').collect::>(); + let tenant_id; + let timeline_id; + if parts.len() > 5 && parts[parts.len() - 5] == "tenants" { + tenant_id = parts[parts.len() - 4].to_string(); + timeline_id = parts[parts.len() - 2].to_string(); + } else { + tenant_id = "*".to_string(); + timeline_id = "*".to_string(); + } + let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; + + let file = { + let start = std::time::Instant::now(); + let system = tokio_epoll_uring::thread_local_system().await; + let file: OwnedFd = system + .open(path, &open_options) + .await + .map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + })?; + let file = File::from(file); + file + }; + + // Strip all options other than read and write. + // + // It would perhaps be nicer to check just for the read and write flags + // explicitly, but OpenOptions doesn't contain any functions to read flags, + // only to set them. + let mut reopen_options = open_options; + reopen_options.create(false); + reopen_options.create_new(false); + reopen_options.truncate(false); + + let vfile = VirtualFile { + handle: RwLock::new(handle), + pos: 0, + path: path.to_path_buf(), + open_options: reopen_options, + tenant_id, + timeline_id, + }; + + Ok(vfile) + } + /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { with_file!(self, StorageIoOperation::Fsync, |file| file @@ -415,7 +480,21 @@ impl VirtualFile { let (handle, mut slot_guard) = open_files.find_victim_slot().await; // Open the physical file - let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?; + let file = { + let system = tokio_epoll_uring::thread_local_system().await; + let file: OwnedFd = + system + .open(&self.path, &self.open_options) + .await + .map_err(|e| match e { + tokio_epoll_uring::Error::Op(e) => e, + tokio_epoll_uring::Error::System(system) => { + std::io::Error::new(std::io::ErrorKind::Other, system) + } + })?; + let file = File::from(file); + file + }; // Store the File in the slot and update the handle in the VirtualFile // to point to it.