Compare commits

...

11 Commits

Author SHA1 Message Date
John Spray
6bd443e3f7 Revise is_fatal_io_error to use allow list 2023-10-05 10:09:49 +01:00
John Spray
dd54c7e687 Clean up unreachable blocks after fatal_io_error 2023-10-05 09:58:09 +01:00
John Spray
89bc3aef1a Merge remote-tracking branch 'upstream/main' into jcsp/terminate-on-io-errors 2023-10-05 09:57:01 +01:00
John Spray
0a502f4117 Use nix errno constants 2023-10-05 09:50:48 +01:00
John Spray
2e19940674 Update pageserver/src/virtual_file.rs
Co-authored-by: Joonas Koivunen <joonas@neon.tech>
2023-10-05 09:47:58 +01:00
John Spray
682f1df2ee Adapt block_io/blob_io to virtual_file::Error 2023-10-02 15:38:08 +01:00
John Spray
c60ffde0c8 Use virtual_file::Error in interface 2023-10-02 15:19:37 +01:00
John Spray
964463bb0b Define a virtual_file::Error type that auto-terminates 2023-10-02 15:18:23 +01:00
John Spray
8df507ccea pageserver: make I/O errors in deletion queue fatal 2023-10-02 14:28:04 +01:00
John Spray
cc2c1a8bf4 pageserver: add hook for terminating on I/O errors 2023-10-02 14:28:04 +01:00
John Spray
218b514498 pageserver: deletion queue nits 2023-10-02 11:41:10 +01:00
5 changed files with 198 additions and 76 deletions

View File

@@ -34,6 +34,8 @@ use crate::deletion_queue::TEMP_SUFFIX;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::storage_layer::LayerFileName;
use crate::virtual_file;
use crate::virtual_file::on_fatal_io_error;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
@@ -195,7 +197,7 @@ impl ListWriter {
debug!("Deletion header {header_path} not found, first start?");
Ok(None)
} else {
Err(anyhow::anyhow!(e))
on_fatal_io_error(&e);
}
}
}
@@ -221,9 +223,9 @@ impl ListWriter {
Err(e) => {
warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
// This is fatal: any failure to read this local directory indicates a
// storage problem or configuration problem of the node.
virtual_file::on_fatal_io_error(&e);
}
};
@@ -249,6 +251,8 @@ impl ListWriter {
// Non-fatal error: we will just leave the file behind but not
// try and load it.
warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
virtual_file::on_fatal_io_error(&e);
}
continue;
@@ -261,7 +265,7 @@ impl ListWriter {
.expect("Non optional group should be present")
.as_str()
} else {
warn!("Unexpected key in deletion queue: {basename}");
warn!("Unexpected filename in deletion queue: {basename}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
continue;
};
@@ -289,7 +293,12 @@ impl ListWriter {
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = tokio::fs::read(&list_path).await?;
let list_bytes = match tokio::fs::read(&list_path).await {
Ok(b) => b,
Err(e) => {
virtual_file::on_fatal_io_error(&e);
}
};
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,

View File

@@ -28,6 +28,7 @@ use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::on_fatal_io_error;
use super::deleter::DeleterMessage;
use super::DeletionHeader;
@@ -116,6 +117,11 @@ where
/// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
/// go into the queue of ready-to-execute lists.
async fn validate(&mut self) -> Result<(), DeletionQueueError> {
// Figure out for each tenant which generation number to validate.
//
// It is sufficient to validate the max generation number of each tenant because only the
// highest generation number can possibly be valid. Hence this map will collect the
// highest generation pending validation for each tenant.
let mut tenant_generations = HashMap::new();
for list in &self.pending_lists {
for (tenant_id, tenant_list) in &list.tenants {
@@ -246,6 +252,11 @@ where
}
}
// Assert monotonicity of the list sequence numbers we are processing
if let Some(validated) = validated_sequence {
assert!(list.sequence >= validated)
}
validated_sequence = Some(list.sequence);
}
@@ -293,7 +304,8 @@ where
// issue (probably permissions) has been fixed by then.
tracing::error!("Failed to delete {list_path}: {e:#}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
break;
on_fatal_io_error(&e);
}
}
}

View File

@@ -234,7 +234,10 @@ impl BlobWriter<false> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
use crate::{
context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef,
virtual_file::Error,
};
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {

View File

@@ -6,7 +6,7 @@ use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use crate::virtual_file::{self, VirtualFile};
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
@@ -96,7 +96,7 @@ impl<'a> BlockReaderRef<'a> {
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
VirtualFile(r) => r.read_blk(blknum).await,
VirtualFile(r) => r.read_blk(blknum).await.map_err(virtual_file::Error::into),
}
}
}
@@ -174,6 +174,7 @@ impl FileBlockReader {
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.await
.map_err(virtual_file::Error::into)
}
/// Read a block.
///

View File

@@ -15,7 +15,7 @@ use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::io::{ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
@@ -173,55 +173,148 @@ impl OpenFiles {
}
}
#[derive(Debug, thiserror::Error)]
pub enum CrashsafeOverwriteError {
#[error("final path has no parent dir")]
FinalPathHasNoParentDir,
#[error("remove tempfile")]
RemovePreviousTempfile(#[source] std::io::Error),
#[error("create tempfile")]
CreateTempfile(#[source] std::io::Error),
#[error("write tempfile")]
WriteContents(#[source] std::io::Error),
#[error("sync tempfile")]
SyncTempfile(#[source] std::io::Error),
#[error("rename tempfile to final path")]
RenameTempfileToFinalPath(#[source] std::io::Error),
#[error("open final path parent dir")]
OpenFinalPathParentDir(#[source] std::io::Error),
#[error("sync final path parent dir")]
SyncFinalPathParentDir(#[source] std::io::Error),
/// Call this when the local filesystem gives us an error with an external
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
/// bad storage or bad configuration, and we can't fix that from inside
/// a running process.
pub(crate) fn on_fatal_io_error(e: &std::io::Error) -> ! {
tracing::error!("Fatal I/O error: {}", &e);
std::process::abort();
}
impl CrashsafeOverwriteError {
/// Returns true iff the new contents are durably stored.
pub fn are_new_contents_durable(&self) -> bool {
match self {
Self::FinalPathHasNoParentDir => false,
Self::RemovePreviousTempfile(_) => false,
Self::CreateTempfile(_) => false,
Self::WriteContents(_) => false,
Self::SyncTempfile(_) => false,
Self::RenameTempfileToFinalPath(_) => false,
Self::OpenFinalPathParentDir(_) => false,
Self::SyncFinalPathParentDir(_) => true,
/// Identify error types that should alwways terminate the process. Other
/// error types may be elegible for retry.
pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
use nix::errno::Errno::*;
match e.raw_os_error().map(nix::errno::from_i32) {
Some(EIO) => {
// Terminate on EIO because we no longer trust the device to store
// data safely, or to uphold persistence guarantees on fsync.
true
}
Some(EROFS) => {
// Terminate on EROFS because a filesystem is usually remounted
// readonly when it has experienced some critical issue, so the same
// logic as EIO applies.
true
}
Some(EACCES) => {
// Terminate on EACCESS because we should always have permissions
// for our own data dir: if we don't, then we can't do our job and
// need administrative intervention to fix permissions. Terminating
// is the best way to make sure we stop cleanly rather than going
// into infinite retry loops, and will make it clear to the outside
// world that we need help.
true
}
_ => {
// Treat all other local file I/O errors are retryable. This includes:
// - ENOSPC: we stay up and wait for eviction to free some space
// - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
// - WriteZero, Interrupted: these are used internally VirtualFile
false
}
}
}
/// Wrap std::io::Error with a behavior where we will terminate the process
/// on most I/O errors from local storage. The rational for terminating is:
/// - EIO means we can't trust the drive any more
/// - EROFS means the local filesystem or drive is damaged, we shouldn't use it any more
/// - EACCESS means something is fatally misconfigured about the pageserver, such
/// as running the process as the wrong user, or the filesystem having the wrong
/// ownership or permission bits. We terminate so that it's obvious to
/// the operator why the pageserver isn't working, and they can restart it when
/// they've fixed the problem.
#[derive(thiserror::Error, Debug)]
pub struct Error {
inner: std::io::Error,
context: Option<String>,
}
impl Error {
/// Wrap a io::Error with some context & terminate
/// the process if the io::Error matches our policy for termination
fn new_with_context(e: std::io::Error, context: &str) -> Self {
Self::build(e, Some(context.to_string()))
}
fn context(e: Self, context: &str) -> Self {
Self {
inner: e.inner,
context: Some(context.to_string()),
}
}
fn new(e: std::io::Error) -> Self {
Self::build(e, None)
}
fn invalid(reason: &str) -> Self {
Self::new(std::io::Error::new(ErrorKind::InvalidInput, reason))
}
fn build(e: std::io::Error, context: Option<String>) -> Self {
// Construct instance early so that we have it for
// using Display in termination message.
let instance = Self { inner: e, context };
// Maybe terminate: this violates the usual expectation that callers
// should make their own decisions about how to handle an Error, but
// it's worthwhile to avoid every single user of the local filesystem
// having to apply the same "terminate on errors" behavior.
if is_fatal_io_error(&instance.inner) {
on_fatal_io_error(&instance.inner);
}
instance
}
fn kind(&self) -> ErrorKind {
self.inner.kind()
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Self::build(e, None)
}
}
impl From<Error> for std::io::Error {
fn from(e: Error) -> std::io::Error {
e.inner
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.context {
Some(context) => {
write!(f, "{}: {}", context, self.inner)
}
None => self.inner.fmt(f),
}
}
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
.await
.map_err(Error::from)
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
.map_err(Error::from)
}
/// Open a file with given options.
@@ -232,7 +325,7 @@ impl VirtualFile {
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
) -> Result<VirtualFile, Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
@@ -284,14 +377,16 @@ impl VirtualFile {
final_path: &Utf8Path,
tmp_path: &Utf8Path,
content: &[u8],
) -> Result<(), CrashsafeOverwriteError> {
let Some(final_path_parent) = final_path.parent() else {
return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
};
) -> Result<(), Error> {
let final_path_parent = final_path.parent().ok_or(std::io::Error::new(
ErrorKind::InvalidInput,
"Path must be absolute",
))?;
match std::fs::remove_file(tmp_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
Err(e) => return Err(Error::new_with_context(e, "removing tempfile")),
}
let mut file = Self::open_with_options(
tmp_path,
@@ -302,17 +397,17 @@ impl VirtualFile {
.create_new(true),
)
.await
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
.map_err(|e| Error::context(e, "create tempfile"))?;
file.write_all(content)
.await
.map_err(CrashsafeOverwriteError::WriteContents)?;
.map_err(|e| Error::context(e, "write contents"))?;
file.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
.map_err(|e| Error::context(e, "sync tempfile"))?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)
.map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
.map_err(|e| Error::new_with_context(e, "rename tempfile to final path"))?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
@@ -321,11 +416,11 @@ impl VirtualFile {
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
.map_err(|e| Error::context(e, "open final path parent"))?;
final_parent_dirfd
.sync_all()
.await
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
.map_err(|e| Error::context(e, "sync final path parent"))?;
Ok(())
}
@@ -333,11 +428,13 @@ impl VirtualFile {
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
.map_err(Error::new)
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
.map_err(Error::new)
}
/// Helper function that looks up the underlying File for this VirtualFile,
@@ -432,13 +529,10 @@ impl VirtualFile {
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
return Err(Error::invalid("offset would be negative"));
}
if pos > u64::MAX as i128 {
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
return Err(Error::invalid("offset overflow"));
}
self.pos = pos as u64;
}
@@ -451,10 +545,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
)
.into())
}
Ok(n) => {
buf = &mut buf[n..];
@@ -472,10 +567,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
)
.into());
}
Ok(n) => {
buf = &buf[n..];
@@ -492,10 +588,11 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write(buf).await {
Ok(0) => {
return Err(Error::new(
return Err(std::io::Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
));
)
.into());
}
Ok(n) => {
buf = &buf[n..];
@@ -507,7 +604,7 @@ impl VirtualFile {
Ok(())
}
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
let pos = self.pos;
let n = self.write_at(buf, pos).await?;
self.pos += n as u64;
@@ -523,7 +620,7 @@ impl VirtualFile {
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
result.map_err(Error::new)
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
@@ -535,7 +632,7 @@ impl VirtualFile {
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
result.map_err(Error::new)
}
}
@@ -544,7 +641,7 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
) -> Result<crate::tenant::block_io::BlockLease<'_>, Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
@@ -660,25 +757,25 @@ mod tests {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset).map_err(Error::new),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset).map_err(Error::new),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos),
MaybeVirtualFile::File(file) => file.seek(pos).map_err(Error::new),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf),
MaybeVirtualFile::File(file) => file.write_all(buf).map_err(Error::new),
}
}
@@ -887,7 +984,7 @@ mod tests {
hdls.push(hdl);
}
for hdl in hdls {
hdl.await?;
hdl.await.expect("joining")
}
std::mem::forget(rt);