Use virtual_file::Error in interface

This commit is contained in:
John Spray
2023-10-02 15:19:37 +01:00
parent 964463bb0b
commit c60ffde0c8

View File

@@ -14,7 +14,7 @@ use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC
use crate::tenant::TENANTS_SEGMENT_NAME;
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::io::{ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
@@ -173,40 +173,8 @@ impl OpenFiles {
}
}
#[derive(Debug, thiserror::Error)]
pub enum CrashsafeOverwriteError {
#[error("final path has no parent dir")]
FinalPathHasNoParentDir,
#[error("remove tempfile: {0}")]
RemovePreviousTempfile(#[source] std::io::Error),
#[error("create tempfile: {0}")]
CreateTempfile(#[source] std::io::Error),
#[error("write tempfile: {0}")]
WriteContents(#[source] std::io::Error),
#[error("sync tempfile: {0}")]
SyncTempfile(#[source] std::io::Error),
#[error("rename tempfile to final path: {0}")]
RenameTempfileToFinalPath(#[source] std::io::Error),
#[error("open final path parent dir: {0}")]
OpenFinalPathParentDir(#[source] std::io::Error),
#[error("sync final path parent dir: {0}")]
SyncFinalPathParentDir(#[source] std::io::Error),
}
impl CrashsafeOverwriteError {
/// Returns true iff the new contents are durably stored.
pub fn are_new_contents_durable(&self) -> bool {
match self {
Self::FinalPathHasNoParentDir => false,
Self::RemovePreviousTempfile(_) => false,
Self::CreateTempfile(_) => false,
Self::WriteContents(_) => false,
Self::SyncTempfile(_) => false,
Self::RenameTempfileToFinalPath(_) => false,
Self::OpenFinalPathParentDir(_) => false,
Self::SyncFinalPathParentDir(_) => true,
}
}
}
const ENOSPC: i32 = 28;
const EBADF: i32 = 9;
/// Call this when the local filesystem gives us an error with an external
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
@@ -346,18 +314,21 @@ impl std::fmt::Display for Error {
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
pub async fn open(path: &Path) -> Result<VirtualFile, Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
.await
.map_err(Error::from)
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
pub async fn create(path: &Path) -> Result<VirtualFile, Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
.map_err(Error::from)
}
/// Open a file with given options.
@@ -368,7 +339,7 @@ impl VirtualFile {
pub async fn open_with_options(
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFile, Error> {
let path_str = path.to_string_lossy();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
@@ -420,14 +391,16 @@ impl VirtualFile {
final_path: &Path,
tmp_path: &Path,
content: &[u8],
) -> Result<(), CrashsafeOverwriteError> {
let Some(final_path_parent) = final_path.parent() else {
return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
};
) -> Result<(), Error> {
let final_path_parent = final_path.parent().ok_or(std::io::Error::new(
ErrorKind::InvalidInput,
"Path must be absolute",
))?;
match std::fs::remove_file(tmp_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
Err(e) => return Err(Error::new_with_context(e, "removing tempfile")),
}
let mut file = Self::open_with_options(
tmp_path,
@@ -438,17 +411,17 @@ impl VirtualFile {
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
.map_err(|e| Error::context(e, "create tempfile"))?;
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
.map_err(|e| Error::context(e, "write contents"))?;
file.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
.map_err(|e| Error::context(e, "sync tempfile"))?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)
.map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
.map_err(|e| Error::new_with_context(e, "rename tempfile to final path"))?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
@@ -457,11 +430,11 @@ impl VirtualFile {
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
.map_err(|e| Error::context(e, "open final path parent"))?;
final_parent_dirfd
.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
.map_err(|e| Error::context(e, "sync final path parent"))?;
Ok(())
}
@@ -469,11 +442,13 @@ impl VirtualFile {
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
.map_err(Error::new)
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
.map_err(Error::new)
}
/// Helper function that looks up the underlying File for this VirtualFile,
@@ -568,13 +543,10 @@ impl VirtualFile {
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
return Err(Error::invalid("offset would be negative"));
}
if pos > u64::MAX as i128 {
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
return Err(Error::invalid("offset overflow"));
}
self.pos = pos as u64;
}
@@ -587,10 +559,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
)
.into())
}
Ok(n) => {
buf = &mut buf[n..];
@@ -608,10 +581,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
)
.into());
}
Ok(n) => {
buf = &buf[n..];
@@ -628,10 +602,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write(buf).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
)
.into());
}
Ok(n) => {
buf = &buf[n..];
@@ -643,7 +618,7 @@ impl VirtualFile {
Ok(())
}
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
let pos = self.pos;
let n = self.write_at(buf, pos).await?;
self.pos += n as u64;
@@ -659,7 +634,7 @@ impl VirtualFile {
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
result.map_err(Error::new)
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
@@ -671,7 +646,7 @@ impl VirtualFile {
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
result.map_err(Error::new)
}
}
@@ -680,7 +655,7 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
) -> Result<crate::tenant::block_io::BlockLease<'_>, Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
@@ -796,25 +771,25 @@ mod tests {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset).map_err(Error::new),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset).map_err(Error::new),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos),
MaybeVirtualFile::File(file) => file.seek(pos).map_err(Error::new),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf),
MaybeVirtualFile::File(file) => file.write_all(buf).map_err(Error::new),
}
}
@@ -1023,7 +998,7 @@ mod tests {
hdls.push(hdl);
}
for hdl in hdls {
hdl.await?;
hdl.await.expect("joining")
}
std::mem::forget(rt);