diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 45c3e19cfc..364a94c982 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -562,7 +562,18 @@ impl VirtualFile { B: IoBufMut + Send, { let (buf, res) = - read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await; + read_exact_at_impl(buf, offset, None, |buf, offset| self.read_at(buf, offset)).await; + res.map(|()| buf) + } + + pub async fn read_exact_at_n(&self, buf: B, offset: u64, count: usize) -> Result + where + B: IoBufMut + Send, + { + let (buf, res) = read_exact_at_impl(buf, offset, Some(count), |buf, offset| { + self.read_at(buf, offset) + }) + .await; res.map(|()| buf) } @@ -696,6 +707,7 @@ impl VirtualFile { pub async fn read_exact_at_impl( buf: B, mut offset: u64, + count: Option, mut read_at: F, ) -> (B, std::io::Result<()>) where @@ -703,7 +715,15 @@ where F: FnMut(tokio_epoll_uring::Slice, u64) -> Fut, Fut: std::future::Future, std::io::Result)>, { - let mut buf: tokio_epoll_uring::Slice = buf.slice_full(); // includes all the uninitialized memory + let mut buf: tokio_epoll_uring::Slice = match count { + Some(count) => { + assert!(count <= buf.bytes_total()); + assert!(count > 0); + buf.slice(..count) // may include uninitialized memory + } + None => buf.slice_full(), // includes all the uninitialized memory + }; + while buf.bytes_total() != 0 { let res; (buf, res) = read_at(buf, offset).await; @@ -793,7 +813,7 @@ mod test_read_exact_at_impl { result: Ok(vec![b'a', b'b', b'c', b'd', b'e']), }]), })); - let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { + let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| { let mock_read_at = Arc::clone(&mock_read_at); async move { mock_read_at.lock().await.read_at(buf, offset).await } }) @@ -802,13 +822,33 @@ mod test_read_exact_at_impl { assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']); } + #[tokio::test] + async fn test_with_count() { + let buf = Vec::with_capacity(5); + let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { + expectations: VecDeque::from(vec![Expectation { + offset: 0, + bytes_total: 3, + result: Ok(vec![b'a', b'b', b'c']), + }]), + })); + + let (buf, res) = read_exact_at_impl(buf, 0, Some(3), |buf, offset| { + let mock_read_at = Arc::clone(&mock_read_at); + async move { mock_read_at.lock().await.read_at(buf, offset).await } + }) + .await; + assert!(res.is_ok()); + assert_eq!(buf, vec![b'a', b'b', b'c']); + } + #[tokio::test] async fn test_empty_buf_issues_no_syscall() { let buf = Vec::new(); let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt { expectations: VecDeque::new(), })); - let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { + let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| { let mock_read_at = Arc::clone(&mock_read_at); async move { mock_read_at.lock().await.read_at(buf, offset).await } }) @@ -833,7 +873,7 @@ mod test_read_exact_at_impl { }, ]), })); - let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { + let (buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| { let mock_read_at = Arc::clone(&mock_read_at); async move { mock_read_at.lock().await.read_at(buf, offset).await } }) @@ -864,7 +904,7 @@ mod test_read_exact_at_impl { }, ]), })); - let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| { + let (_buf, res) = read_exact_at_impl(buf, 0, None, |buf, offset| { let mock_read_at = Arc::clone(&mock_read_at); async move { mock_read_at.lock().await.read_at(buf, offset).await } })