From f417db52e0db2c930d646b8bfee0b2a1addc193e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Thu, 7 Sep 2023 15:48:30 +0200 Subject: [PATCH] wip --- pageserver/src/virtual_file.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 452f384f7c..f437deece7 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -12,6 +12,7 @@ //! use crate::metrics::{STORAGE_IO_SIZE, STORAGE_IO_TIME}; use crate::tenant::TENANTS_SEGMENT_NAME; +use futures::Future; use once_cell::sync::OnceCell; use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; @@ -330,19 +331,22 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - self.with_file("fsync", |file| file.sync_all()).await? + self.with_file("fsync", |file| async move { file.sync_all() }) + .await? } pub async fn metadata(&self) -> Result { - self.with_file("metadata", |file| file.metadata()).await? + self.with_file("metadata", |file| async move { file.metadata() }) + .await? } /// Helper function that looks up the underlying File for this VirtualFile, /// opening it and evicting some other File if necessary. It calls 'func' /// with the physical File. - async fn with_file(&self, op: &str, mut func: F) -> Result + async fn with_file<'a, F, R, FR>(&self, _op: &str, func: F) -> Result where - F: FnMut(&File) -> R, + F: FnOnce(&'a File) -> FR, + FR: Future + 'a, { let open_files = get_open_files(); @@ -363,9 +367,7 @@ impl VirtualFile { if let Some(file) = &slot_guard.file { // Found a cached file descriptor. slot.recently_used.store(true, Ordering::Relaxed); - return Ok(STORAGE_IO_TIME - .with_label_values(&[op]) - .observe_closure_duration(|| func(file))); + return Ok(func(file).await); } } } @@ -395,9 +397,7 @@ impl VirtualFile { .observe_closure_duration(|| self.open_options.open(&self.path))?; // Perform the requested operation on it - let result = STORAGE_IO_TIME - .with_label_values(&[op]) - .observe_closure_duration(|| func(&file)); + let result = func(&file).await; // Store the File in the slot and update the handle in the VirtualFile // to point to it. @@ -421,7 +421,9 @@ impl VirtualFile { } SeekFrom::End(offset) => { self.pos = self - .with_file("seek", |mut file| file.seek(SeekFrom::End(offset))) + .with_file("seek", |mut file| async move { + file.seek(SeekFrom::End(offset)) + }) .await?? } SeekFrom::Current(offset) => { @@ -511,7 +513,7 @@ impl VirtualFile { pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { let result = self - .with_file("read", |file| file.read_at(buf, offset)) + .with_file("read", |file| async move { file.read_at(buf, offset) }) .await?; if let Ok(size) = result { STORAGE_IO_SIZE @@ -523,7 +525,7 @@ impl VirtualFile { async fn write_at(&self, buf: &[u8], offset: u64) -> Result { let result = self - .with_file("write", |file| file.write_at(buf, offset)) + .with_file("write", |file| async move { file.write_at(buf, offset) }) .await?; if let Ok(size) = result { STORAGE_IO_SIZE