mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-26 23:59:58 +00:00
integrate tokio-epoll-uring as alternative VirtualFile IO engine (#5824)
This commit is contained in:
committed by
GitHub
parent
d36623ad74
commit
918b03b3b0
@@ -11,18 +11,28 @@
|
||||
//! src/backend/storage/file/fd.c
|
||||
//!
|
||||
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
|
||||
|
||||
use crate::page_cache::PageWriteGuard;
|
||||
use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::fs::{self, File};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use tokio_epoll_uring::IoBufMut;
|
||||
|
||||
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
use tokio::time::Instant;
|
||||
use utils::fs_ext;
|
||||
|
||||
mod io_engine;
|
||||
mod open_options;
|
||||
pub use io_engine::IoEngineKind;
|
||||
pub(crate) use open_options::*;
|
||||
|
||||
///
|
||||
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
|
||||
/// the underlying file is closed if the system is low on file descriptors,
|
||||
@@ -106,7 +116,38 @@ struct SlotInner {
|
||||
tag: u64,
|
||||
|
||||
/// the underlying file
|
||||
file: Option<File>,
|
||||
file: Option<OwnedFd>,
|
||||
}
|
||||
|
||||
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
|
||||
struct PageWriteGuardBuf {
|
||||
page: PageWriteGuard<'static>,
|
||||
init_up_to: usize,
|
||||
}
|
||||
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
|
||||
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
|
||||
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
|
||||
fn stable_ptr(&self) -> *const u8 {
|
||||
self.page.as_ptr()
|
||||
}
|
||||
fn bytes_init(&self) -> usize {
|
||||
self.init_up_to
|
||||
}
|
||||
fn bytes_total(&self) -> usize {
|
||||
self.page.len()
|
||||
}
|
||||
}
|
||||
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
|
||||
// hence it's safe to hand out the `stable_mut_ptr()`.
|
||||
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
|
||||
fn stable_mut_ptr(&mut self) -> *mut u8 {
|
||||
self.page.as_mut_ptr()
|
||||
}
|
||||
|
||||
unsafe fn set_init(&mut self, pos: usize) {
|
||||
assert!(pos <= self.page.len());
|
||||
self.init_up_to = pos;
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenFiles {
|
||||
@@ -274,6 +315,10 @@ macro_rules! with_file {
|
||||
let $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
|
||||
let mut $ident = $this.lock_file().await?;
|
||||
observe_duration!($op, $($body)*)
|
||||
}};
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
@@ -326,7 +371,9 @@ impl VirtualFile {
|
||||
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
|
||||
// where our caller doesn't get to use the returned VirtualFile before its
|
||||
// slot gets re-used by someone else.
|
||||
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
|
||||
let file = observe_duration!(StorageIoOperation::Open, {
|
||||
open_options.open(path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Strip all options other than read and write.
|
||||
//
|
||||
@@ -395,15 +442,13 @@ impl VirtualFile {
|
||||
|
||||
/// Call File::sync_all() on the underlying File.
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
with_file!(self, StorageIoOperation::Fsync, |file| file
|
||||
.as_ref()
|
||||
.sync_all())
|
||||
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.sync_all()))
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
with_file!(self, StorageIoOperation::Metadata, |file| file
|
||||
.as_ref()
|
||||
.metadata())
|
||||
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
|
||||
.with_std_file(|std_file| std_file.metadata()))
|
||||
}
|
||||
|
||||
/// Helper function internal to `VirtualFile` that looks up the underlying File,
|
||||
@@ -412,7 +457,7 @@ impl VirtualFile {
|
||||
///
|
||||
/// We are doing it via a macro as Rust doesn't support async closures that
|
||||
/// take on parameters with lifetimes.
|
||||
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
|
||||
async fn lock_file(&self) -> Result<FileGuard, Error> {
|
||||
let open_files = get_open_files();
|
||||
|
||||
let mut handle_guard = {
|
||||
@@ -458,10 +503,9 @@ impl VirtualFile {
|
||||
// NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
|
||||
// case from StorageIoOperation::Open. This helps with identifying thrashing
|
||||
// of the virtual file descriptor cache.
|
||||
let file = observe_duration!(
|
||||
StorageIoOperation::OpenAfterReplace,
|
||||
self.open_options.open(&self.path)
|
||||
)?;
|
||||
let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
|
||||
self.open_options.open(self.path.as_std_path()).await?
|
||||
});
|
||||
|
||||
// Store the File in the slot and update the handle in the VirtualFile
|
||||
// to point to it.
|
||||
@@ -486,9 +530,8 @@ impl VirtualFile {
|
||||
self.pos = offset;
|
||||
}
|
||||
SeekFrom::End(offset) => {
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
|
||||
.as_ref()
|
||||
.seek(SeekFrom::End(offset)))?
|
||||
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
|
||||
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
|
||||
}
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
@@ -507,25 +550,28 @@ impl VirtualFile {
|
||||
Ok(self.pos)
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &mut buf[n..];
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
pub async fn read_exact_at<B>(&self, buf: B, offset: u64) -> Result<B, Error>
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
{
|
||||
let (buf, res) =
|
||||
read_exact_at_impl(buf, offset, |buf, offset| self.read_at(buf, offset)).await;
|
||||
res.map(|()| buf)
|
||||
}
|
||||
|
||||
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
|
||||
pub async fn read_exact_at_page(
|
||||
&self,
|
||||
page: PageWriteGuard<'static>,
|
||||
offset: u64,
|
||||
) -> Result<PageWriteGuard<'static>, Error> {
|
||||
let buf = PageWriteGuardBuf {
|
||||
page,
|
||||
init_up_to: 0,
|
||||
};
|
||||
let res = self.read_exact_at(buf, offset).await;
|
||||
res.map(|PageWriteGuardBuf { page, .. }| page)
|
||||
.map_err(|e| Error::new(ErrorKind::Other, e))
|
||||
}
|
||||
|
||||
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
|
||||
@@ -575,22 +621,35 @@ impl VirtualFile {
|
||||
Ok(n)
|
||||
}
|
||||
|
||||
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Read, |file| file
|
||||
.as_ref()
|
||||
.read_at(buf, offset));
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["read", &self.tenant_id, &self.shard_id, &self.timeline_id])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
|
||||
where
|
||||
B: tokio_epoll_uring::BoundedBufMut + Send,
|
||||
{
|
||||
let file_guard = match self.lock_file().await {
|
||||
Ok(file_guard) => file_guard,
|
||||
Err(e) => return (buf, Err(e)),
|
||||
};
|
||||
|
||||
observe_duration!(StorageIoOperation::Read, {
|
||||
let ((_file_guard, buf), res) = io_engine::get().read_at(file_guard, offset, buf).await;
|
||||
if let Ok(size) = res {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&[
|
||||
"read",
|
||||
&self.tenant_id,
|
||||
&self.shard_id,
|
||||
&self.timeline_id,
|
||||
])
|
||||
.add(size as i64);
|
||||
}
|
||||
(buf, res)
|
||||
})
|
||||
}
|
||||
|
||||
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file| file
|
||||
.as_ref()
|
||||
.write_at(buf, offset));
|
||||
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
|
||||
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
|
||||
});
|
||||
if let Ok(size) = result {
|
||||
STORAGE_IO_SIZE
|
||||
.with_label_values(&["write", &self.tenant_id, &self.shard_id, &self.timeline_id])
|
||||
@@ -600,18 +659,241 @@ impl VirtualFile {
|
||||
}
|
||||
}
|
||||
|
||||
struct FileGuard<'a> {
|
||||
slot_guard: RwLockReadGuard<'a, SlotInner>,
|
||||
// Adapted from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
|
||||
pub async fn read_exact_at_impl<B, F, Fut>(
|
||||
buf: B,
|
||||
mut offset: u64,
|
||||
mut read_at: F,
|
||||
) -> (B, std::io::Result<()>)
|
||||
where
|
||||
B: IoBufMut + Send,
|
||||
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
|
||||
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
|
||||
{
|
||||
use tokio_epoll_uring::BoundedBuf;
|
||||
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
|
||||
while buf.bytes_total() != 0 {
|
||||
let res;
|
||||
(buf, res) = read_at(buf, offset).await;
|
||||
match res {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
buf = buf.slice(n..);
|
||||
offset += n as u64;
|
||||
}
|
||||
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
|
||||
Err(e) => return (buf.into_inner(), Err(e)),
|
||||
}
|
||||
}
|
||||
// NB: don't use `buf.is_empty()` here; it is from the
|
||||
// `impl Deref for Slice { Target = [u8] }`; the the &[u8]
|
||||
// returned by it only covers the initialized portion of `buf`.
|
||||
// Whereas we're interested in ensuring that we filled the entire
|
||||
// buffer that the user passed in.
|
||||
if buf.bytes_total() != 0 {
|
||||
(
|
||||
buf.into_inner(),
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
)),
|
||||
)
|
||||
} else {
|
||||
assert_eq!(buf.len(), buf.bytes_total());
|
||||
(buf.into_inner(), Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> AsRef<File> for FileGuard<'a> {
|
||||
fn as_ref(&self) -> &File {
|
||||
#[cfg(test)]
|
||||
mod test_read_exact_at_impl {
|
||||
|
||||
use std::{collections::VecDeque, sync::Arc};
|
||||
|
||||
use tokio_epoll_uring::{BoundedBuf, BoundedBufMut};
|
||||
|
||||
use super::read_exact_at_impl;
|
||||
|
||||
struct Expectation {
|
||||
offset: u64,
|
||||
bytes_total: usize,
|
||||
result: std::io::Result<Vec<u8>>,
|
||||
}
|
||||
struct MockReadAt {
|
||||
expectations: VecDeque<Expectation>,
|
||||
}
|
||||
|
||||
impl MockReadAt {
|
||||
async fn read_at(
|
||||
&mut self,
|
||||
mut buf: tokio_epoll_uring::Slice<Vec<u8>>,
|
||||
offset: u64,
|
||||
) -> (tokio_epoll_uring::Slice<Vec<u8>>, std::io::Result<usize>) {
|
||||
let exp = self
|
||||
.expectations
|
||||
.pop_front()
|
||||
.expect("read_at called but we have no expectations left");
|
||||
assert_eq!(exp.offset, offset);
|
||||
assert_eq!(exp.bytes_total, buf.bytes_total());
|
||||
match exp.result {
|
||||
Ok(bytes) => {
|
||||
assert!(bytes.len() <= buf.bytes_total());
|
||||
buf.put_slice(&bytes);
|
||||
(buf, Ok(bytes.len()))
|
||||
}
|
||||
Err(e) => (buf, Err(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MockReadAt {
|
||||
fn drop(&mut self) {
|
||||
assert_eq!(self.expectations.len(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_basic() {
|
||||
let buf = Vec::with_capacity(5);
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![Expectation {
|
||||
offset: 0,
|
||||
bytes_total: 5,
|
||||
result: Ok(vec![b'a', b'b', b'c', b'd', b'e']),
|
||||
}]),
|
||||
}));
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(buf, vec![b'a', b'b', b'c', b'd', b'e']);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_empty_buf_issues_no_syscall() {
|
||||
let buf = Vec::new();
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::new(),
|
||||
}));
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_two_read_at_calls_needed_until_buf_filled() {
|
||||
let buf = Vec::with_capacity(4);
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![
|
||||
Expectation {
|
||||
offset: 0,
|
||||
bytes_total: 4,
|
||||
result: Ok(vec![b'a', b'b']),
|
||||
},
|
||||
Expectation {
|
||||
offset: 2,
|
||||
bytes_total: 2,
|
||||
result: Ok(vec![b'c', b'd']),
|
||||
},
|
||||
]),
|
||||
}));
|
||||
let (buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
assert!(res.is_ok());
|
||||
assert_eq!(buf, vec![b'a', b'b', b'c', b'd']);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_eof_before_buffer_full() {
|
||||
let buf = Vec::with_capacity(3);
|
||||
let mock_read_at = Arc::new(tokio::sync::Mutex::new(MockReadAt {
|
||||
expectations: VecDeque::from(vec![
|
||||
Expectation {
|
||||
offset: 0,
|
||||
bytes_total: 3,
|
||||
result: Ok(vec![b'a']),
|
||||
},
|
||||
Expectation {
|
||||
offset: 1,
|
||||
bytes_total: 2,
|
||||
result: Ok(vec![b'b']),
|
||||
},
|
||||
Expectation {
|
||||
offset: 2,
|
||||
bytes_total: 1,
|
||||
result: Ok(vec![]),
|
||||
},
|
||||
]),
|
||||
}));
|
||||
let (_buf, res) = read_exact_at_impl(buf, 0, |buf, offset| {
|
||||
let mock_read_at = Arc::clone(&mock_read_at);
|
||||
async move { mock_read_at.lock().await.read_at(buf, offset).await }
|
||||
})
|
||||
.await;
|
||||
let Err(err) = res else {
|
||||
panic!("should return an error");
|
||||
};
|
||||
assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
|
||||
assert_eq!(format!("{err}"), "failed to fill whole buffer");
|
||||
// buffer contents on error are unspecified
|
||||
}
|
||||
}
|
||||
|
||||
struct FileGuard {
|
||||
slot_guard: RwLockReadGuard<'static, SlotInner>,
|
||||
}
|
||||
|
||||
impl AsRef<OwnedFd> for FileGuard {
|
||||
fn as_ref(&self) -> &OwnedFd {
|
||||
// This unwrap is safe because we only create `FileGuard`s
|
||||
// if we know that the file is Some.
|
||||
self.slot_guard.file.as_ref().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileGuard {
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file<F, R>(&self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
|
||||
let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
/// Soft deprecation: we'll move VirtualFile to async APIs and remove this function eventually.
|
||||
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
|
||||
where
|
||||
F: FnOnce(&mut File) -> R,
|
||||
{
|
||||
// SAFETY:
|
||||
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
|
||||
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
|
||||
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
|
||||
let res = with(&mut file);
|
||||
let _ = file.into_raw_fd();
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_epoll_uring::IoFd for FileGuard {
|
||||
unsafe fn as_fd(&self) -> RawFd {
|
||||
let owned_fd: &OwnedFd = self.as_ref();
|
||||
owned_fd.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
@@ -619,16 +901,19 @@ impl VirtualFile {
|
||||
blknum: u32,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let mut buf = [0; PAGE_SZ];
|
||||
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
let buf = vec![0; PAGE_SZ];
|
||||
let buf = self
|
||||
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
.await?;
|
||||
Ok(std::sync::Arc::new(buf).into())
|
||||
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
|
||||
}
|
||||
|
||||
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
|
||||
let mut tmp = vec![0; 128];
|
||||
loop {
|
||||
let mut tmp = [0; 128];
|
||||
match self.read_at(&mut tmp, self.pos).await {
|
||||
let res;
|
||||
(tmp, res) = self.read_at(tmp, self.pos).await;
|
||||
match res {
|
||||
Ok(0) => return Ok(()),
|
||||
Ok(n) => {
|
||||
self.pos += n as u64;
|
||||
@@ -704,10 +989,12 @@ impl OpenFiles {
|
||||
/// Initialize the virtual file module. This must be called once at page
|
||||
/// server startup.
|
||||
///
|
||||
pub fn init(num_slots: usize) {
|
||||
#[cfg(not(test))]
|
||||
pub fn init(num_slots: usize, engine: IoEngineKind) {
|
||||
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
|
||||
panic!("virtual_file::init called twice");
|
||||
}
|
||||
io_engine::init(engine);
|
||||
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
|
||||
}
|
||||
|
||||
@@ -752,10 +1039,10 @@ mod tests {
|
||||
}
|
||||
|
||||
impl MaybeVirtualFile {
|
||||
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
|
||||
async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
|
||||
}
|
||||
}
|
||||
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
|
||||
@@ -797,14 +1084,14 @@ mod tests {
|
||||
|
||||
// Helper function to slurp a portion of a file into a string
|
||||
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
|
||||
let mut buf = vec![0; len];
|
||||
self.read_exact_at(&mut buf, pos).await?;
|
||||
let buf = vec![0; len];
|
||||
let buf = self.read_exact_at(buf, pos).await?;
|
||||
Ok(String::from_utf8(buf).unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_virtual_files() -> Result<(), Error> {
|
||||
async fn test_virtual_files() -> anyhow::Result<()> {
|
||||
// The real work is done in the test_files() helper function. This
|
||||
// allows us to run the same set of tests against a native File, and
|
||||
// VirtualFile. We trust the native Files and wouldn't need to test them,
|
||||
@@ -820,14 +1107,17 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_physical_files() -> Result<(), Error> {
|
||||
async fn test_physical_files() -> anyhow::Result<()> {
|
||||
test_files("physical_files", |path, open_options| async move {
|
||||
Ok(MaybeVirtualFile::File(open_options.open(path)?))
|
||||
Ok(MaybeVirtualFile::File({
|
||||
let owned_fd = open_options.open(path.as_std_path()).await?;
|
||||
File::from(owned_fd)
|
||||
}))
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
|
||||
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
|
||||
where
|
||||
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
|
||||
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
|
||||
@@ -971,11 +1261,11 @@ mod tests {
|
||||
for _threadno in 0..THREADS {
|
||||
let files = files.clone();
|
||||
let hdl = rt.spawn(async move {
|
||||
let mut buf = [0u8; SIZE];
|
||||
let mut buf = vec![0u8; SIZE];
|
||||
let mut rng = rand::rngs::OsRng;
|
||||
for _ in 1..1000 {
|
||||
let f = &files[rng.gen_range(0..files.len())];
|
||||
f.read_exact_at(&mut buf, 0).await.unwrap();
|
||||
buf = f.read_exact_at(buf, 0).await.unwrap();
|
||||
assert!(buf == SAMPLE);
|
||||
}
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user