diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index 831f2c0744..9ed518980f 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -156,7 +156,9 @@ impl FileBlockReader { /// Read a page from the underlying file into given buffer. async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> { assert!(buf.len() == PAGE_SZ); - self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + self.file + .read_exact_at(buf, blkno as u64 * PAGE_SZ as u64) + .await } /// Read a block. /// diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 1edb86f857..4c5fe424f3 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -87,7 +87,8 @@ impl EphemeralFile { let buf: &mut [u8] = write_guard.deref_mut(); debug_assert_eq!(buf.len(), PAGE_SZ); self.file - .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?; + .read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64) + .await?; write_guard.mark_valid(); // Swap for read lock diff --git a/pageserver/src/tenant/manifest.rs b/pageserver/src/tenant/manifest.rs index fef66abd4f..acc2b709ff 100644 --- a/pageserver/src/tenant/manifest.rs +++ b/pageserver/src/tenant/manifest.rs @@ -156,6 +156,7 @@ impl Manifest { ) -> Result<(Self, Vec, ManifestPartiallyCorrupted), ManifestLoadError> { let mut buf = vec![]; file.read_exact_at(&mut buf, 0) + .await .map_err(ManifestLoadError::Io)?; // Read manifest header diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 89543442c0..e519b47bd0 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -348,7 +348,7 @@ impl VirtualFile { } // Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135 - pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { + pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> { while !buf.is_empty() { match self.read_at(buf, offset) { Ok(0) => { @@ -513,7 +513,6 @@ mod tests { use rand::thread_rng; use rand::Rng; use std::sync::Arc; - use std::thread; enum MaybeVirtualFile { VirtualFile(VirtualFile), @@ -521,9 +520,9 @@ mod tests { } impl MaybeVirtualFile { - fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { + async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> { match self { - MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset), + MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await, MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset), } } @@ -569,7 +568,7 @@ mod tests { async fn read_string_at(&mut self, pos: u64, len: usize) -> Result { let mut buf = Vec::new(); buf.resize(len, 0); - self.read_exact_at(&mut buf, pos)?; + self.read_exact_at(&mut buf, pos).await?; Ok(String::from_utf8(buf).unwrap()) } } @@ -723,28 +722,22 @@ mod tests { let files = Arc::new(files); // Launch many threads, and use the virtual files concurrently in random order. - let mut threads = Vec::new(); - for threadno in 0..THREADS { - let builder = - thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno)); - + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(THREADS) + .thread_name("test_vfile_concurrency thread") + .build() + .unwrap(); + for _threadno in 0..THREADS { let files = files.clone(); - let thread = builder - .spawn(move || { - let mut buf = [0u8; SIZE]; - let mut rng = rand::thread_rng(); - for _ in 1..1000 { - let f = &files[rng.gen_range(0..files.len())]; - f.read_exact_at(&mut buf, 0).unwrap(); - assert!(buf == SAMPLE); - } - }) - .unwrap(); - threads.push(thread); - } - - for thread in threads { - thread.join().unwrap(); + rt.spawn(async move { + let mut buf = [0u8; SIZE]; + let mut rng = rand::rngs::OsRng; + for _ in 1..1000 { + let f = &files[rng.gen_range(0..files.len())]; + f.read_exact_at(&mut buf, 0).await.unwrap(); + assert!(buf == SAMPLE); + } + }); } Ok(())