mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Make read_exact_at async fn
This commit is contained in:
@@ -156,7 +156,9 @@ impl FileBlockReader<VirtualFile> {
|
||||
/// Read a page from the underlying file into given buffer.
|
||||
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
|
||||
assert!(buf.len() == PAGE_SZ);
|
||||
self.file.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
self.file
|
||||
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.await
|
||||
}
|
||||
/// Read a block.
|
||||
///
|
||||
|
||||
@@ -87,7 +87,8 @@ impl EphemeralFile {
|
||||
let buf: &mut [u8] = write_guard.deref_mut();
|
||||
debug_assert_eq!(buf.len(), PAGE_SZ);
|
||||
self.file
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)?;
|
||||
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
|
||||
.await?;
|
||||
write_guard.mark_valid();
|
||||
|
||||
// Swap for read lock
|
||||
|
||||
@@ -156,6 +156,7 @@ impl Manifest {
|
||||
) -> Result<(Self, Vec<Operation>, ManifestPartiallyCorrupted), ManifestLoadError> {
|
||||
let mut buf = vec![];
|
||||
file.read_exact_at(&mut buf, 0)
|
||||
.await
|
||||
.map_err(ManifestLoadError::Io)?;
|
||||
|
||||
// Read manifest header
|
||||
|
||||
@@ -348,7 +348,7 @@ impl VirtualFile {
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset) {
|
||||
Ok(0) => {
|
||||
@@ -513,7 +513,6 @@ mod tests {
|
||||
use rand::thread_rng;
|
||||
use rand::Rng;
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
|
||||
enum MaybeVirtualFile {
|
||||
VirtualFile(VirtualFile),
|
||||
@@ -521,9 +520,9 @@ mod tests {
|
||||
}
|
||||
|
||||
impl MaybeVirtualFile {
|
||||
fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
|
||||
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset),
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
|
||||
}
|
||||
}
|
||||
@@ -569,7 +568,7 @@ mod tests {
|
||||
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
|
||||
let mut buf = Vec::new();
|
||||
buf.resize(len, 0);
|
||||
self.read_exact_at(&mut buf, pos)?;
|
||||
self.read_exact_at(&mut buf, pos).await?;
|
||||
Ok(String::from_utf8(buf).unwrap())
|
||||
}
|
||||
}
|
||||
@@ -723,28 +722,22 @@ mod tests {
|
||||
let files = Arc::new(files);
|
||||
|
||||
// Launch many threads, and use the virtual files concurrently in random order.
|
||||
let mut threads = Vec::new();
|
||||
for threadno in 0..THREADS {
|
||||
let builder =
|
||||
thread::Builder::new().name(format!("test_vfile_concurrency thread {}", threadno));
|
||||
|
||||
let rt = tokio::runtime::Builder::new_multi_thread()
|
||||
.worker_threads(THREADS)
|
||||
.thread_name("test_vfile_concurrency thread")
|
||||
.build()
|
||||
.unwrap();
|
||||
for _threadno in 0..THREADS {
|
||||
let files = files.clone();
|
||||
let thread = builder
|
||||
.spawn(move || {
|
||||
let mut buf = [0u8; SIZE];
|
||||
let mut rng = rand::thread_rng();
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
f.read_exact_at(&mut buf, 0).unwrap();
|
||||
assert!(buf == SAMPLE);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
threads.push(thread);
|
||||
}
|
||||
|
||||
for thread in threads {
|
||||
thread.join().unwrap();
|
||||
rt.spawn(async move {
|
||||
let mut buf = [0u8; SIZE];
|
||||
let mut rng = rand::rngs::OsRng;
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
f.read_exact_at(&mut buf, 0).await.unwrap();
|
||||
assert!(buf == SAMPLE);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user