From ba13f2a90a19e32da602326990577aa32b789181 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 21 Jun 2024 09:44:46 +0000 Subject: [PATCH] async closure workaround for virtual file --- pageserver/src/virtual_file.rs | 57 ++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 13 deletions(-) diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index f8419b630d..b5a82b84da 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -594,10 +594,8 @@ impl VirtualFile { where B: IoBufMut + Send, { - let (buf, res) = read_exact_at_impl(buf, offset, None, |buf, offset| { - self.read_at(buf, offset, ctx) - }) - .await; + let (buf, res) = + read_exact_at_impl(buf, offset, None, VirtualFileReadAt { file: self, ctx }).await; res.map(|()| buf) } @@ -611,9 +609,12 @@ impl VirtualFile { where B: IoBufMut + Send, { - let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| { - self.read_at(buf, offset, ctx) - }) + let (buf, res) = read_exact_at_impl( + buf, + offset, + Some(count), + VirtualFileReadAt { file: self, ctx }, + ) .await; res.map(|()| buf) } @@ -780,8 +781,35 @@ impl VirtualFile { } } +trait AsyncClosureThatDoesReadAt { + async fn call( + &mut self, + buf: tokio_epoll_uring::Slice, + offset: u64, + ) -> (tokio_epoll_uring::Slice, std::io::Result) + where + B: IoBufMut + Send; +} + +struct VirtualFileReadAt<'a> { + file: &'a VirtualFile, + ctx: &'a mut RequestContext, +} +impl<'a> AsyncClosureThatDoesReadAt for VirtualFileReadAt<'a> { + async fn call( + &mut self, + buf: tokio_epoll_uring::Slice, + offset: u64, + ) -> (tokio_epoll_uring::Slice, std::io::Result) + where + B: IoBufMut + Send, + { + self.file.read_at(buf, offset, self.ctx).await + } +} + // Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 -pub async fn read_exact_at_impl( +pub async fn read_exact_at_impl( buf: B, mut offset: u64, count: Option, @@ -789,8 +817,7 @@ pub async fn read_exact_at_impl( ) -> (B, std::io::Result<()>) where B: IoBufMut + Send, - F: FnMut(tokio_epoll_uring::Slice, u64) -> Fut, - Fut: std::future::Future, std::io::Result)>, + F: AsyncClosureThatDoesReadAt, { let mut buf: tokio_epoll_uring::Slice = match count { Some(count) => { @@ -802,8 +829,8 @@ where }; while buf.bytes_total() != 0 { - let res; - (buf, res) = read_at(buf, offset).await; + let res: std::io::Result; + (buf, res) = read_at.call(buf, offset).await; match res { Ok(0) => break, Ok(n) => { @@ -1058,7 +1085,11 @@ impl VirtualFile { Ok(crate::tenant::block_io::BlockLease::Vec(buf)) } - async fn read_to_end(&mut self, buf: &mut Vec, ctx: &mut RequestContext) -> Result<(), Error> { + async fn read_to_end( + &mut self, + buf: &mut Vec, + ctx: &mut RequestContext, + ) -> Result<(), Error> { let mut tmp = vec![0; 128]; loop { let res;