refactor(virtual_file): take owned buffer in VirtualFile::write_all (#6664)

Building atop #6660 , this PR converts VirtualFile::write_all to
owned buffers.

Part of https://github.com/neondatabase/neon/issues/6663
This commit is contained in:
Christian Schwarz
2024-02-13 18:46:25 +01:00
committed by GitHub
parent 331935df91
commit 7fa732c96c
8 changed files with 81 additions and 83 deletions

View File

@@ -19,7 +19,7 @@ use once_cell::sync::OnceCell;
use pageserver_api::shard::TenantShardId;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use tokio_epoll_uring::IoBufMut;
use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
@@ -410,10 +410,10 @@ impl VirtualFile {
/// step, the tmp path is renamed to the final path. As renames are
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite(
pub async fn crashsafe_overwrite<B: BoundedBuf>(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
content: B,
) -> std::io::Result<()> {
let Some(final_path_parent) = final_path.parent() else {
return Err(std::io::Error::from_raw_os_error(
@@ -430,7 +430,8 @@ impl VirtualFile {
.create_new(true),
)
.await?;
file.write_all(content).await?;
let (_content, res) = file.write_all(content).await;
res?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
// renames are atomic
@@ -601,23 +602,36 @@ impl VirtualFile {
Ok(())
}
pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> {
/// Writes `buf.slice(0..buf.bytes_init())`.
/// Returns the IoBuf that is underlying the BoundedBuf `buf`.
/// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in.
/// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant.
pub async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> (B::Buf, Result<usize, Error>) {
let nbytes = buf.bytes_init();
if nbytes == 0 {
return (Slice::into_inner(buf.slice_full()), Ok(0));
}
let mut buf = buf.slice(0..nbytes);
while !buf.is_empty() {
match self.write(buf).await {
// TODO: push `Slice` further down
match self.write(&buf).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
return (
Slice::into_inner(buf),
Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
);
}
Ok(n) => {
buf = &buf[n..];
buf = buf.slice(n..);
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
Err(e) => return (Slice::into_inner(buf), Err(e)),
}
}
Ok(())
(Slice::into_inner(buf), Ok(nbytes))
}
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
@@ -676,7 +690,6 @@ where
F: FnMut(tokio_epoll_uring::Slice<B>, u64) -> Fut,
Fut: std::future::Future<Output = (tokio_epoll_uring::Slice<B>, std::io::Result<usize>)>,
{
use tokio_epoll_uring::BoundedBuf;
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full(); // includes all the uninitialized memory
while buf.bytes_total() != 0 {
let res;
@@ -1063,10 +1076,19 @@ mod tests {
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
async fn write_all<B: BoundedBuf>(&mut self, buf: B) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf),
MaybeVirtualFile::VirtualFile(file) => {
let (_buf, res) = file.write_all(buf).await;
res.map(|_| ())
}
MaybeVirtualFile::File(file) => {
let buf_len = buf.bytes_init();
if buf_len == 0 {
return Ok(());
}
file.write_all(&buf.slice(0..buf_len))
}
}
}
@@ -1141,7 +1163,7 @@ mod tests {
.to_owned(),
)
.await?;
file_a.write_all(b"foobar").await?;
file_a.write_all(b"foobar".to_vec()).await?;
// cannot read from a file opened in write-only mode
let _ = file_a.read_string().await.unwrap_err();
@@ -1150,7 +1172,7 @@ mod tests {
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar").await.unwrap_err();
let _ = file_a.write_all(b"bar".to_vec()).await.unwrap_err();
// Try simple read
assert_eq!("foobar", file_a.read_string().await?);
@@ -1293,7 +1315,7 @@ mod tests {
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1302,7 +1324,7 @@ mod tests {
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec())
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
@@ -1324,7 +1346,7 @@ mod tests {
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec())
.await
.unwrap();