mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-05 19:50:36 +00:00
Compare commits
11 Commits
erik/postg
...
jcsp/termi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6bd443e3f7 | ||
|
|
dd54c7e687 | ||
|
|
89bc3aef1a | ||
|
|
0a502f4117 | ||
|
|
2e19940674 | ||
|
|
682f1df2ee | ||
|
|
c60ffde0c8 | ||
|
|
964463bb0b | ||
|
|
8df507ccea | ||
|
|
cc2c1a8bf4 | ||
|
|
218b514498 |
@@ -34,6 +34,8 @@ use crate::deletion_queue::TEMP_SUFFIX;
|
||||
use crate::metrics;
|
||||
use crate::tenant::remote_timeline_client::remote_layer_path;
|
||||
use crate::tenant::storage_layer::LayerFileName;
|
||||
use crate::virtual_file;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
|
||||
// The number of keys in a DeletionList before we will proactively persist it
|
||||
// (without reaching a flush deadline). This aims to deliver objects of the order
|
||||
@@ -195,7 +197,7 @@ impl ListWriter {
|
||||
debug!("Deletion header {header_path} not found, first start?");
|
||||
Ok(None)
|
||||
} else {
|
||||
Err(anyhow::anyhow!(e))
|
||||
on_fatal_io_error(&e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -221,9 +223,9 @@ impl ListWriter {
|
||||
Err(e) => {
|
||||
warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
|
||||
|
||||
// Give up: if we can't read the deletion list directory, we probably can't
|
||||
// write lists into it later, so the queue won't work.
|
||||
return Err(e.into());
|
||||
// This is fatal: any failure to read this local directory indicates a
|
||||
// storage problem or configuration problem of the node.
|
||||
virtual_file::on_fatal_io_error(&e);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -249,6 +251,8 @@ impl ListWriter {
|
||||
// Non-fatal error: we will just leave the file behind but not
|
||||
// try and load it.
|
||||
warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
|
||||
|
||||
virtual_file::on_fatal_io_error(&e);
|
||||
}
|
||||
|
||||
continue;
|
||||
@@ -261,7 +265,7 @@ impl ListWriter {
|
||||
.expect("Non optional group should be present")
|
||||
.as_str()
|
||||
} else {
|
||||
warn!("Unexpected key in deletion queue: {basename}");
|
||||
warn!("Unexpected filename in deletion queue: {basename}");
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
continue;
|
||||
};
|
||||
@@ -289,7 +293,12 @@ impl ListWriter {
|
||||
for s in seqs {
|
||||
let list_path = self.conf.deletion_list_path(s);
|
||||
|
||||
let list_bytes = tokio::fs::read(&list_path).await?;
|
||||
let list_bytes = match tokio::fs::read(&list_path).await {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
virtual_file::on_fatal_io_error(&e);
|
||||
}
|
||||
};
|
||||
|
||||
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
|
||||
Ok(l) => l,
|
||||
|
||||
@@ -28,6 +28,7 @@ use crate::config::PageServerConf;
|
||||
use crate::control_plane_client::ControlPlaneGenerationsApi;
|
||||
use crate::control_plane_client::RetryForeverError;
|
||||
use crate::metrics;
|
||||
use crate::virtual_file::on_fatal_io_error;
|
||||
|
||||
use super::deleter::DeleterMessage;
|
||||
use super::DeletionHeader;
|
||||
@@ -116,6 +117,11 @@ where
|
||||
/// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
|
||||
/// go into the queue of ready-to-execute lists.
|
||||
async fn validate(&mut self) -> Result<(), DeletionQueueError> {
|
||||
// Figure out for each tenant which generation number to validate.
|
||||
//
|
||||
// It is sufficient to validate the max generation number of each tenant because only the
|
||||
// highest generation number can possibly be valid. Hence this map will collect the
|
||||
// highest generation pending validation for each tenant.
|
||||
let mut tenant_generations = HashMap::new();
|
||||
for list in &self.pending_lists {
|
||||
for (tenant_id, tenant_list) in &list.tenants {
|
||||
@@ -246,6 +252,11 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
// Assert monotonicity of the list sequence numbers we are processing
|
||||
if let Some(validated) = validated_sequence {
|
||||
assert!(list.sequence >= validated)
|
||||
}
|
||||
|
||||
validated_sequence = Some(list.sequence);
|
||||
}
|
||||
|
||||
@@ -293,7 +304,8 @@ where
|
||||
// issue (probably permissions) has been fixed by then.
|
||||
tracing::error!("Failed to delete {list_path}: {e:#}");
|
||||
metrics::DELETION_QUEUE.unexpected_errors.inc();
|
||||
break;
|
||||
|
||||
on_fatal_io_error(&e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -234,7 +234,10 @@ impl BlobWriter<false> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
|
||||
use crate::{
|
||||
context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef,
|
||||
virtual_file::Error,
|
||||
};
|
||||
use rand::{Rng, SeedableRng};
|
||||
|
||||
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
|
||||
|
||||
@@ -6,7 +6,7 @@ use super::ephemeral_file::EphemeralFile;
|
||||
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
|
||||
use crate::context::RequestContext;
|
||||
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
|
||||
use crate::virtual_file::VirtualFile;
|
||||
use crate::virtual_file::{self, VirtualFile};
|
||||
use bytes::Bytes;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
@@ -96,7 +96,7 @@ impl<'a> BlockReaderRef<'a> {
|
||||
#[cfg(test)]
|
||||
TestDisk(r) => r.read_blk(blknum),
|
||||
#[cfg(test)]
|
||||
VirtualFile(r) => r.read_blk(blknum).await,
|
||||
VirtualFile(r) => r.read_blk(blknum).await.map_err(virtual_file::Error::into),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -174,6 +174,7 @@ impl FileBlockReader {
|
||||
self.file
|
||||
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
|
||||
.await
|
||||
.map_err(virtual_file::Error::into)
|
||||
}
|
||||
/// Read a block.
|
||||
///
|
||||
|
||||
@@ -15,7 +15,7 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
|
||||
use camino::{Utf8Path, Utf8PathBuf};
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::fs::{self, File, OpenOptions};
|
||||
use std::io::{Error, ErrorKind, Seek, SeekFrom};
|
||||
use std::io::{ErrorKind, Seek, SeekFrom};
|
||||
use std::os::unix::fs::FileExt;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{RwLock, RwLockWriteGuard};
|
||||
@@ -173,55 +173,148 @@ impl OpenFiles {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum CrashsafeOverwriteError {
|
||||
#[error("final path has no parent dir")]
|
||||
FinalPathHasNoParentDir,
|
||||
#[error("remove tempfile")]
|
||||
RemovePreviousTempfile(#[source] std::io::Error),
|
||||
#[error("create tempfile")]
|
||||
CreateTempfile(#[source] std::io::Error),
|
||||
#[error("write tempfile")]
|
||||
WriteContents(#[source] std::io::Error),
|
||||
#[error("sync tempfile")]
|
||||
SyncTempfile(#[source] std::io::Error),
|
||||
#[error("rename tempfile to final path")]
|
||||
RenameTempfileToFinalPath(#[source] std::io::Error),
|
||||
#[error("open final path parent dir")]
|
||||
OpenFinalPathParentDir(#[source] std::io::Error),
|
||||
#[error("sync final path parent dir")]
|
||||
SyncFinalPathParentDir(#[source] std::io::Error),
|
||||
/// Call this when the local filesystem gives us an error with an external
|
||||
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
|
||||
/// bad storage or bad configuration, and we can't fix that from inside
|
||||
/// a running process.
|
||||
pub(crate) fn on_fatal_io_error(e: &std::io::Error) -> ! {
|
||||
tracing::error!("Fatal I/O error: {}", &e);
|
||||
std::process::abort();
|
||||
}
|
||||
impl CrashsafeOverwriteError {
|
||||
/// Returns true iff the new contents are durably stored.
|
||||
pub fn are_new_contents_durable(&self) -> bool {
|
||||
match self {
|
||||
Self::FinalPathHasNoParentDir => false,
|
||||
Self::RemovePreviousTempfile(_) => false,
|
||||
Self::CreateTempfile(_) => false,
|
||||
Self::WriteContents(_) => false,
|
||||
Self::SyncTempfile(_) => false,
|
||||
Self::RenameTempfileToFinalPath(_) => false,
|
||||
Self::OpenFinalPathParentDir(_) => false,
|
||||
Self::SyncFinalPathParentDir(_) => true,
|
||||
|
||||
/// Identify error types that should alwways terminate the process. Other
|
||||
/// error types may be elegible for retry.
|
||||
pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
|
||||
use nix::errno::Errno::*;
|
||||
match e.raw_os_error().map(nix::errno::from_i32) {
|
||||
Some(EIO) => {
|
||||
// Terminate on EIO because we no longer trust the device to store
|
||||
// data safely, or to uphold persistence guarantees on fsync.
|
||||
true
|
||||
}
|
||||
Some(EROFS) => {
|
||||
// Terminate on EROFS because a filesystem is usually remounted
|
||||
// readonly when it has experienced some critical issue, so the same
|
||||
// logic as EIO applies.
|
||||
true
|
||||
}
|
||||
Some(EACCES) => {
|
||||
// Terminate on EACCESS because we should always have permissions
|
||||
// for our own data dir: if we don't, then we can't do our job and
|
||||
// need administrative intervention to fix permissions. Terminating
|
||||
// is the best way to make sure we stop cleanly rather than going
|
||||
// into infinite retry loops, and will make it clear to the outside
|
||||
// world that we need help.
|
||||
true
|
||||
}
|
||||
_ => {
|
||||
// Treat all other local file I/O errors are retryable. This includes:
|
||||
// - ENOSPC: we stay up and wait for eviction to free some space
|
||||
// - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
|
||||
// - WriteZero, Interrupted: these are used internally VirtualFile
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrap std::io::Error with a behavior where we will terminate the process
|
||||
/// on most I/O errors from local storage. The rational for terminating is:
|
||||
/// - EIO means we can't trust the drive any more
|
||||
/// - EROFS means the local filesystem or drive is damaged, we shouldn't use it any more
|
||||
/// - EACCESS means something is fatally misconfigured about the pageserver, such
|
||||
/// as running the process as the wrong user, or the filesystem having the wrong
|
||||
/// ownership or permission bits. We terminate so that it's obvious to
|
||||
/// the operator why the pageserver isn't working, and they can restart it when
|
||||
/// they've fixed the problem.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub struct Error {
|
||||
inner: std::io::Error,
|
||||
context: Option<String>,
|
||||
}
|
||||
|
||||
impl Error {
|
||||
/// Wrap a io::Error with some context & terminate
|
||||
/// the process if the io::Error matches our policy for termination
|
||||
fn new_with_context(e: std::io::Error, context: &str) -> Self {
|
||||
Self::build(e, Some(context.to_string()))
|
||||
}
|
||||
|
||||
fn context(e: Self, context: &str) -> Self {
|
||||
Self {
|
||||
inner: e.inner,
|
||||
context: Some(context.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn new(e: std::io::Error) -> Self {
|
||||
Self::build(e, None)
|
||||
}
|
||||
|
||||
fn invalid(reason: &str) -> Self {
|
||||
Self::new(std::io::Error::new(ErrorKind::InvalidInput, reason))
|
||||
}
|
||||
|
||||
fn build(e: std::io::Error, context: Option<String>) -> Self {
|
||||
// Construct instance early so that we have it for
|
||||
// using Display in termination message.
|
||||
let instance = Self { inner: e, context };
|
||||
|
||||
// Maybe terminate: this violates the usual expectation that callers
|
||||
// should make their own decisions about how to handle an Error, but
|
||||
// it's worthwhile to avoid every single user of the local filesystem
|
||||
// having to apply the same "terminate on errors" behavior.
|
||||
if is_fatal_io_error(&instance.inner) {
|
||||
on_fatal_io_error(&instance.inner);
|
||||
}
|
||||
|
||||
instance
|
||||
}
|
||||
|
||||
fn kind(&self) -> ErrorKind {
|
||||
self.inner.kind()
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::io::Error> for Error {
|
||||
fn from(e: std::io::Error) -> Self {
|
||||
Self::build(e, None)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Error> for std::io::Error {
|
||||
fn from(e: Error) -> std::io::Error {
|
||||
e.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Error {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match &self.context {
|
||||
Some(context) => {
|
||||
write!(f, "{}: {}", context, self.inner)
|
||||
}
|
||||
None => self.inner.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl VirtualFile {
|
||||
/// Open a file in read-only mode. Like File::open.
|
||||
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
|
||||
Self::open_with_options(path, OpenOptions::new().read(true)).await
|
||||
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, Error> {
|
||||
Self::open_with_options(path, OpenOptions::new().read(true))
|
||||
.await
|
||||
.map_err(Error::from)
|
||||
}
|
||||
|
||||
/// Create a new file for writing. If the file exists, it will be truncated.
|
||||
/// Like File::create.
|
||||
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
|
||||
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, Error> {
|
||||
Self::open_with_options(
|
||||
path,
|
||||
OpenOptions::new().write(true).create(true).truncate(true),
|
||||
)
|
||||
.await
|
||||
.map_err(Error::from)
|
||||
}
|
||||
|
||||
/// Open a file with given options.
|
||||
@@ -232,7 +325,7 @@ impl VirtualFile {
|
||||
pub async fn open_with_options(
|
||||
path: &Utf8Path,
|
||||
open_options: &OpenOptions,
|
||||
) -> Result<VirtualFile, std::io::Error> {
|
||||
) -> Result<VirtualFile, Error> {
|
||||
let path_str = path.to_string();
|
||||
let parts = path_str.split('/').collect::<Vec<&str>>();
|
||||
let tenant_id;
|
||||
@@ -284,14 +377,16 @@ impl VirtualFile {
|
||||
final_path: &Utf8Path,
|
||||
tmp_path: &Utf8Path,
|
||||
content: &[u8],
|
||||
) -> Result<(), CrashsafeOverwriteError> {
|
||||
let Some(final_path_parent) = final_path.parent() else {
|
||||
return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
|
||||
};
|
||||
) -> Result<(), Error> {
|
||||
let final_path_parent = final_path.parent().ok_or(std::io::Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
"Path must be absolute",
|
||||
))?;
|
||||
|
||||
match std::fs::remove_file(tmp_path) {
|
||||
Ok(()) => {}
|
||||
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
|
||||
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
|
||||
Err(e) => return Err(Error::new_with_context(e, "removing tempfile")),
|
||||
}
|
||||
let mut file = Self::open_with_options(
|
||||
tmp_path,
|
||||
@@ -302,17 +397,17 @@ impl VirtualFile {
|
||||
.create_new(true),
|
||||
)
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
|
||||
.map_err(|e| Error::context(e, "create tempfile"))?;
|
||||
file.write_all(content)
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::WriteContents)?;
|
||||
.map_err(|e| Error::context(e, "write contents"))?;
|
||||
file.sync_all()
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
|
||||
.map_err(|e| Error::context(e, "sync tempfile"))?;
|
||||
drop(file); // before the rename, that's important!
|
||||
// renames are atomic
|
||||
std::fs::rename(tmp_path, final_path)
|
||||
.map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
|
||||
.map_err(|e| Error::new_with_context(e, "rename tempfile to final path"))?;
|
||||
// Only open final path parent dirfd now, so that this operation only
|
||||
// ever holds one VirtualFile fd at a time. That's important because
|
||||
// the current `find_victim_slot` impl might pick the same slot for both
|
||||
@@ -321,11 +416,11 @@ impl VirtualFile {
|
||||
let final_parent_dirfd =
|
||||
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
|
||||
.map_err(|e| Error::context(e, "open final path parent"))?;
|
||||
final_parent_dirfd
|
||||
.sync_all()
|
||||
.await
|
||||
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
|
||||
.map_err(|e| Error::context(e, "sync final path parent"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -333,11 +428,13 @@ impl VirtualFile {
|
||||
pub async fn sync_all(&self) -> Result<(), Error> {
|
||||
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
|
||||
.await?
|
||||
.map_err(Error::new)
|
||||
}
|
||||
|
||||
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
|
||||
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
|
||||
.await?
|
||||
.map_err(Error::new)
|
||||
}
|
||||
|
||||
/// Helper function that looks up the underlying File for this VirtualFile,
|
||||
@@ -432,13 +529,10 @@ impl VirtualFile {
|
||||
SeekFrom::Current(offset) => {
|
||||
let pos = self.pos as i128 + offset as i128;
|
||||
if pos < 0 {
|
||||
return Err(Error::new(
|
||||
ErrorKind::InvalidInput,
|
||||
"offset would be negative",
|
||||
));
|
||||
return Err(Error::invalid("offset would be negative"));
|
||||
}
|
||||
if pos > u64::MAX as i128 {
|
||||
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
|
||||
return Err(Error::invalid("offset overflow"));
|
||||
}
|
||||
self.pos = pos as u64;
|
||||
}
|
||||
@@ -451,10 +545,11 @@ impl VirtualFile {
|
||||
while !buf.is_empty() {
|
||||
match self.read_at(buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::UnexpectedEof,
|
||||
"failed to fill whole buffer",
|
||||
))
|
||||
)
|
||||
.into())
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &mut buf[n..];
|
||||
@@ -472,10 +567,11 @@ impl VirtualFile {
|
||||
while !buf.is_empty() {
|
||||
match self.write_at(buf, offset).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
));
|
||||
)
|
||||
.into());
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &buf[n..];
|
||||
@@ -492,10 +588,11 @@ impl VirtualFile {
|
||||
while !buf.is_empty() {
|
||||
match self.write(buf).await {
|
||||
Ok(0) => {
|
||||
return Err(Error::new(
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::WriteZero,
|
||||
"failed to write whole buffer",
|
||||
));
|
||||
)
|
||||
.into());
|
||||
}
|
||||
Ok(n) => {
|
||||
buf = &buf[n..];
|
||||
@@ -507,7 +604,7 @@ impl VirtualFile {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
|
||||
async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
|
||||
let pos = self.pos;
|
||||
let n = self.write_at(buf, pos).await?;
|
||||
self.pos += n as u64;
|
||||
@@ -523,7 +620,7 @@ impl VirtualFile {
|
||||
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
result.map_err(Error::new)
|
||||
}
|
||||
|
||||
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
|
||||
@@ -535,7 +632,7 @@ impl VirtualFile {
|
||||
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
|
||||
.add(size as i64);
|
||||
}
|
||||
result
|
||||
result.map_err(Error::new)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -544,7 +641,7 @@ impl VirtualFile {
|
||||
pub(crate) async fn read_blk(
|
||||
&self,
|
||||
blknum: u32,
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
|
||||
) -> Result<crate::tenant::block_io::BlockLease<'_>, Error> {
|
||||
use crate::page_cache::PAGE_SZ;
|
||||
let mut buf = [0; PAGE_SZ];
|
||||
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
|
||||
@@ -660,25 +757,25 @@ mod tests {
|
||||
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
|
||||
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset).map_err(Error::new),
|
||||
}
|
||||
}
|
||||
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
|
||||
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset).map_err(Error::new),
|
||||
}
|
||||
}
|
||||
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
|
||||
MaybeVirtualFile::File(file) => file.seek(pos),
|
||||
MaybeVirtualFile::File(file) => file.seek(pos).map_err(Error::new),
|
||||
}
|
||||
}
|
||||
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
|
||||
match self {
|
||||
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
|
||||
MaybeVirtualFile::File(file) => file.write_all(buf),
|
||||
MaybeVirtualFile::File(file) => file.write_all(buf).map_err(Error::new),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -887,7 +984,7 @@ mod tests {
|
||||
hdls.push(hdl);
|
||||
}
|
||||
for hdl in hdls {
|
||||
hdl.await?;
|
||||
hdl.await.expect("joining")
|
||||
}
|
||||
std::mem::forget(rt);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user