fix cfg(test) and provide again VirtualFile::read_at, used only in cfg(test)

This commit is contained in:
Christian Schwarz
2023-12-11 20:15:41 +00:00
parent 0ca94f5b53
commit 69158a33dd
5 changed files with 184 additions and 141 deletions

4
Cargo.lock generated
View File

@@ -5158,7 +5158,7 @@ dependencies = [
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#2ae71cebd8f032ebfa987b713a77bc3d23aac57c"
dependencies = [
"futures",
"once_cell",
@@ -5714,7 +5714,7 @@ dependencies = [
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#53c832d322244f704c7612125496df5ac117ac39"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#2ae71cebd8f032ebfa987b713a77bc3d23aac57c"
dependencies = [
"io-uring",
"libc",

View File

@@ -83,7 +83,6 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
# WIP PR: https://github.com/neondatabase/tokio-epoll-uring/pull/25
#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }

View File

@@ -39,6 +39,8 @@ pub enum BlockLease<'a> {
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
#[cfg(test)]
Vec(Vec<u8>),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
@@ -63,6 +65,13 @@ impl<'a> Deref for BlockLease<'a> {
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
#[cfg(test)]
BlockLease::Vec(v) => {
let v: &Vec<u8> = v;
assert_eq!(v.len(), PAGE_SZ, "caller must ensure that v has PAGE_SZ");
// Safety: see above assertion.
unsafe { &*(v.as_ptr() as *const [u8; PAGE_SZ]) }
}
}
}
}
@@ -176,7 +185,7 @@ impl FileBlockReader {
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
.await
}
/// Read a block.

View File

@@ -91,7 +91,7 @@ impl EphemeralFile {
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = self
.file
.read_exact_at(write_guard, blknum as u64 * PAGE_SZ as u64)
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));

View File

@@ -21,6 +21,7 @@ use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use tokio_epoll_uring::IoBufMut;
use utils::fs_ext;
///
@@ -108,6 +109,37 @@ struct SlotInner {
file: Option<File>,
}
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
struct PageWriteGuardBuf {
page: PageWriteGuard<'static>,
init_up_to: usize,
}
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.page.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.page.len()
}
}
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
// hence it's safe to hand out the `stable_mut_ptr()`.
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.page.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
assert!(pos <= self.page.len());
self.init_up_to = pos;
}
}
impl OpenFiles {
/// Find a slot to use, evicting an existing file descriptor if needed.
///
@@ -291,61 +323,6 @@ impl VirtualFile {
Self::open_with_options_async(path, options).await
}
/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
#[cfg(test)]
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
tenant_id = parts[parts.len() - 4].to_string();
timeline_id = parts[parts.len() - 2].to_string();
} else {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
// where our caller doesn't get to use the returned VirtualFile before its
// slot gets re-used by someone else.
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
// Strip all options other than read and write.
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options.clone();
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
handle: RwLock::new(handle),
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenant_id,
timeline_id,
};
// TODO: Under pressure, it's likely the slot will get re-used and
// the underlying file closed before they get around to using it.
// => https://github.com/neondatabase/neon/issues/6065
slot_guard.file.replace(file);
Ok(vfile)
}
/// Writes a file to the specified `final_path` in a crash safe fasion
///
/// The file is first written to the specified tmp_path, and in a second
@@ -424,8 +401,7 @@ impl VirtualFile {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
file
File::from(file)
}));
// Strip all options other than read and write.
@@ -526,8 +502,7 @@ impl VirtualFile {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
let file = File::from(file);
file
File::from(file)
});
// Store the File in the slot and update the handle in the VirtualFile
@@ -575,66 +550,47 @@ impl VirtualFile {
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at(
pub async fn read_exact_at<B>(&self, buf: B, mut offset: u64) -> Result<B, Error>
where
B: IoBufMut + Send,
{
use tokio_epoll_uring::BoundedBuf;
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full();
while buf.bytes_total() != 0 {
let res;
(buf, res) = self.read_at(buf, offset).await;
match res {
Ok(0) => break,
Ok(n) => {
buf = buf.slice(n..);
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
if !buf.is_empty() {
Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
} else {
Ok(buf.into_inner())
}
}
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
pub async fn read_exact_at_page(
&self,
page: PageWriteGuard<'static>,
offset: u64,
) -> Result<PageWriteGuard<'static>, Error> {
with_file!(self, StorageIoOperation::Read, |file_guard| {
self.read_exact_at0(file_guard, page, offset).await
})
}
async fn read_exact_at0(
&self,
file_guard: FileGuard<'static>,
write_guard: PageWriteGuard<'static>,
offset: u64,
) -> Result<PageWriteGuard<'static>, Error> {
let system = tokio_epoll_uring::thread_local_system().await;
struct PageWriteGuardBuf {
buf: PageWriteGuard<'static>,
init_up_to: usize,
}
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.buf.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.buf.len()
}
}
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.buf.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
assert!(pos <= self.buf.len());
self.init_up_to = pos;
}
}
let buf = PageWriteGuardBuf {
buf: write_guard,
page,
init_up_to: 0,
};
let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) };
let guard = scopeguard::guard(file_guard, |_| {
panic!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)")
});
let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await;
let _ = OwnedFd::into_raw_fd(owned_fd);
let _ = scopeguard::ScopeGuard::into_inner(guard);
let PageWriteGuardBuf {
buf: write_guard,
init_up_to,
} = buf;
if let Ok(num_read) = res {
assert!(init_up_to == num_read); // TODO need to deal with short reads here
}
res.map(|_| write_guard)
let res = self.read_exact_at(buf, offset).await;
res.map(|PageWriteGuardBuf { page, .. }| page)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
@@ -685,11 +641,59 @@ impl VirtualFile {
Ok(n)
}
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
let (buf, result) = observe_duration!(StorageIoOperation::Read, {
self.read_at0(file_guard, buf, offset).await
});
(buf, result)
}
async fn read_at0<B>(
&self,
file_guard: FileGuard<'_>,
buf: B,
offset: u64,
) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let system = tokio_epoll_uring::thread_local_system().await;
// SAFETY: when file_guard gets dropped, the raw fd becomes invalid or may get re-used
// while the io_uring operation is still executing.
// The `file_guard` could get dropped due to future cancellation-by-drop.
// We prevent this situation using the scopeguard: it will abort the process in such cases.
// Fixing this is subject of https://github.com/neondatabase/neon/pull/6101
let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) };
let guard = scopeguard::guard(file_guard, |_| {
eprintln!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)");
std::process::abort();
});
let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await;
let _ = OwnedFd::into_raw_fd(owned_fd);
let _ = scopeguard::ScopeGuard::into_inner(guard);
if let Ok(size) = res {
// TODO: don't use with_label_values on hot path
// https://github.com/neondatabase/neon/issues/6107
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
(buf, res.map_err(|e| Error::new(ErrorKind::Other, e)))
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
if let Ok(size) = result {
// TODO: don't use with_label_values on hot path
// https://github.com/neondatabase/neon/issues/6107
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
.add(size as i64);
@@ -717,16 +721,19 @@ impl VirtualFile {
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
let buf = vec![0; PAGE_SZ];
let buf = self
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
.await?;
Ok(std::sync::Arc::new(buf).into())
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let mut tmp = [0; 128];
match self.read_at(&mut tmp, self.pos).await {
let res;
(tmp, res) = self.read_at(tmp, self.pos).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
@@ -850,10 +857,10 @@ mod tests {
}
impl MaybeVirtualFile {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
@@ -895,14 +902,15 @@ mod tests {
// Helper function to slurp a portion of a file into a string
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
let mut buf = vec![0; len];
self.read_exact_at(&mut buf, pos).await?;
let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos).await?;
Ok(String::from_utf8(buf).unwrap())
}
}
#[tokio::test]
async fn test_virtual_files() -> Result<(), Error> {
async fn test_virtual_files() -> anyhow::Result<()> {
crate::tenant::harness::setup_logging();
// The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them,
@@ -911,23 +919,35 @@ mod tests {
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| async move {
let vf = VirtualFile::open_with_options(&path, &open_options).await?;
let vf = VirtualFile::open_with_options_async(&path, open_options).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
}
#[tokio::test]
async fn test_physical_files() -> Result<(), Error> {
async fn test_physical_files() -> anyhow::Result<()> {
test_files("physical_files", |path, open_options| async move {
Ok(MaybeVirtualFile::File(open_options.open(path)?))
Ok(MaybeVirtualFile::File({
let system = tokio_epoll_uring::thread_local_system().await;
let owned_fd = system
.open(path, &open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
File::from(owned_fd)
}))
})
.await
}
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
where
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
OF: Fn(Utf8PathBuf, tokio_epoll_uring::ops::open_at::OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
@@ -936,7 +956,7 @@ mod tests {
let path_a = testdir.join("file_a");
let mut file_a = openfunc(
path_a.clone(),
OpenOptions::new()
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
@@ -949,7 +969,13 @@ mod tests {
let _ = file_a.read_string().await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
let mut file_a = openfunc(
path_a,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
)
.await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar").await.unwrap_err();
@@ -986,7 +1012,7 @@ mod tests {
let path_b = testdir.join("file_b");
let mut file_b = openfunc(
path_b.clone(),
OpenOptions::new()
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.write(true)
.create(true)
@@ -1007,8 +1033,13 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile =
openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
let mut vfile = openfunc(
path_b.clone(),
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
)
.await?;
assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile);
}
@@ -1053,8 +1084,12 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
.await?;
let f = VirtualFile::open_with_options_async(&test_file_path, {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
options
})
.await?;
files.push(f);
}
let files = Arc::new(files);
@@ -1069,11 +1104,11 @@ mod tests {
for _threadno in 0..THREADS {
let files = files.clone();
let hdl = rt.spawn(async move {
let mut buf = [0u8; SIZE];
let mut buf = vec![0u8; SIZE];
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
f.read_exact_at(&mut buf, 0).await.unwrap();
buf = f.read_exact_at(buf, 0).await.unwrap();
assert!(buf == SAMPLE);
}
});