mirror of
https://github.com/neondatabase/neon.git
synced 2026-03-05 01:10:38 +00:00
Compare commits
6 Commits
parameteri
...
silence_si
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57a9d7d1e2 | ||
|
|
fcc3da7642 | ||
|
|
8449572c0f | ||
|
|
45582a0ec4 | ||
|
|
c3a027b8b4 | ||
|
|
4feb12548b |
@@ -25,7 +25,11 @@ use tokio::io;
|
|||||||
use toml_edit::Item;
|
use toml_edit::Item;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket, simulate_failures::UnreliableWrapper};
|
pub use self::{
|
||||||
|
local_fs::LocalFs,
|
||||||
|
s3_bucket::S3Bucket,
|
||||||
|
simulate_failures::{SimulatedError, UnreliableWrapper},
|
||||||
|
};
|
||||||
|
|
||||||
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
|
/// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage.
|
||||||
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
|
/// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency
|
||||||
@@ -190,26 +194,44 @@ impl Debug for Download {
|
|||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum DownloadError {
|
pub enum DownloadError {
|
||||||
/// Validation or other error happened due to user input.
|
/// Validation or other error happened due to user input.
|
||||||
|
///
|
||||||
|
/// This is only used by LOCAL_FS.
|
||||||
BadInput(anyhow::Error),
|
BadInput(anyhow::Error),
|
||||||
|
|
||||||
/// The file was not found in the remote storage.
|
/// The file was not found in the remote storage.
|
||||||
|
///
|
||||||
|
/// This can only happen during download, never during delete.
|
||||||
NotFound,
|
NotFound,
|
||||||
/// The file was found in the remote storage, but the download failed.
|
|
||||||
|
/// The file was found in the remote storage, but the operation failed.
|
||||||
|
///
|
||||||
|
/// The error should have context already describing the real failed operation.
|
||||||
Other(anyhow::Error),
|
Other(anyhow::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for DownloadError {
|
impl std::fmt::Display for DownloadError {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
use DownloadError::*;
|
||||||
match self {
|
match self {
|
||||||
DownloadError::BadInput(e) => {
|
NotFound => write!(f, "No file found for the remote object id given"),
|
||||||
write!(f, "Failed to download a remote file due to user input: {e}")
|
// this is same as thiserror error(transparent); it handles {} and {:#}
|
||||||
}
|
Other(e) | BadInput(e) => std::fmt::Display::fmt(e, f),
|
||||||
DownloadError::NotFound => write!(f, "No file found for the remote object id given"),
|
|
||||||
DownloadError::Other(e) => write!(f, "Failed to download a remote file: {e:?}"),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::error::Error for DownloadError {}
|
impl std::error::Error for DownloadError {
|
||||||
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
|
use DownloadError::*;
|
||||||
|
match self {
|
||||||
|
NotFound => None,
|
||||||
|
Other(_) | BadInput(_) => {
|
||||||
|
// TODO: these are anyhow, cannot return here
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Every storage, currently supported.
|
/// Every storage, currently supported.
|
||||||
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
/// Serves as a simple way to pass around the [`RemoteStorage`] without dealing with generics.
|
||||||
|
|||||||
@@ -345,7 +345,7 @@ impl RemoteStorage for S3Bucket {
|
|||||||
.set_max_keys(self.max_keys_per_list_response)
|
.set_max_keys(self.max_keys_per_list_response)
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
.context("Failed to list S3 prefixes")
|
.context("list S3 prefixes")
|
||||||
.map_err(DownloadError::Other);
|
.map_err(DownloadError::Other);
|
||||||
|
|
||||||
let started_at = ScopeGuard::into_inner(started_at);
|
let started_at = ScopeGuard::into_inner(started_at);
|
||||||
@@ -397,7 +397,7 @@ impl RemoteStorage for S3Bucket {
|
|||||||
.set_max_keys(self.max_keys_per_list_response)
|
.set_max_keys(self.max_keys_per_list_response)
|
||||||
.send()
|
.send()
|
||||||
.await
|
.await
|
||||||
.context("Failed to list files in S3 bucket");
|
.context("list files in S3 bucket");
|
||||||
|
|
||||||
let started_at = ScopeGuard::into_inner(started_at);
|
let started_at = ScopeGuard::into_inner(started_at);
|
||||||
metrics::BUCKET_METRICS
|
metrics::BUCKET_METRICS
|
||||||
@@ -521,10 +521,7 @@ impl RemoteStorage for S3Bucket {
|
|||||||
.deleted_objects_total
|
.deleted_objects_total
|
||||||
.inc_by(chunk.len() as u64);
|
.inc_by(chunk.len() as u64);
|
||||||
if let Some(errors) = resp.errors {
|
if let Some(errors) = resp.errors {
|
||||||
return Err(anyhow::format_err!(
|
return Err(anyhow::anyhow!("delete {} objects", errors.len()));
|
||||||
"Failed to delete {} objects",
|
|
||||||
errors.len()
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ pub struct UnreliableWrapper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Used to identify retries of different unique operation.
|
/// Used to identify retries of different unique operation.
|
||||||
#[derive(Debug, Hash, Eq, PartialEq)]
|
#[derive(Hash, Eq, PartialEq)]
|
||||||
enum RemoteOp {
|
enum RemoteOp {
|
||||||
ListPrefixes(Option<RemotePath>),
|
ListPrefixes(Option<RemotePath>),
|
||||||
Upload(RemotePath),
|
Upload(RemotePath),
|
||||||
@@ -27,6 +27,22 @@ enum RemoteOp {
|
|||||||
DeleteObjects(Vec<RemotePath>),
|
DeleteObjects(Vec<RemotePath>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Debug for RemoteOp {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
use RemoteOp::*;
|
||||||
|
match self {
|
||||||
|
ListPrefixes(arg0) => f.debug_tuple("ListPrefixes").field(arg0).finish(),
|
||||||
|
Upload(arg0) => f.debug_tuple("Upload").field(arg0).finish(),
|
||||||
|
Download(arg0) => f.debug_tuple("Download").field(arg0).finish(),
|
||||||
|
Delete(arg0) => f.debug_tuple("Delete").field(arg0).finish(),
|
||||||
|
DeleteObjects(many) if many.len() > 3 => {
|
||||||
|
write!(f, "DeleteObjects({} paths)", many.len())
|
||||||
|
}
|
||||||
|
DeleteObjects(few) => f.debug_tuple("DeleteObjects").field(few).finish(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl UnreliableWrapper {
|
impl UnreliableWrapper {
|
||||||
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
|
pub fn new(inner: crate::GenericRemoteStorage, attempts_to_fail: u64) -> Self {
|
||||||
assert!(attempts_to_fail > 0);
|
assert!(attempts_to_fail > 0);
|
||||||
@@ -59,13 +75,12 @@ impl UnreliableWrapper {
|
|||||||
e.remove();
|
e.remove();
|
||||||
Ok(attempts_before_this)
|
Ok(attempts_before_this)
|
||||||
} else {
|
} else {
|
||||||
let error =
|
let error = anyhow::anyhow!(SimulatedError::from(e.key()));
|
||||||
anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
|
|
||||||
Err(DownloadError::Other(error))
|
Err(DownloadError::Other(error))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Entry::Vacant(e) => {
|
Entry::Vacant(e) => {
|
||||||
let error = anyhow::anyhow!("simulated failure of remote operation {:?}", e.key());
|
let error = anyhow::anyhow!(SimulatedError::from(e.key()));
|
||||||
e.insert(1);
|
e.insert(1);
|
||||||
Err(DownloadError::Other(error))
|
Err(DownloadError::Other(error))
|
||||||
}
|
}
|
||||||
@@ -80,6 +95,26 @@ impl UnreliableWrapper {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `pub` type for checking if this is the root cause around logging.
|
||||||
|
///
|
||||||
|
/// This is just a string to avoid cloning a huge number of paths a second time.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SimulatedError(String);
|
||||||
|
|
||||||
|
impl<'a> From<&'a RemoteOp> for SimulatedError {
|
||||||
|
fn from(value: &'_ RemoteOp) -> Self {
|
||||||
|
SimulatedError(format!("simulated failure of remote operation {:?}", value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for SimulatedError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.write_str(&self.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for SimulatedError {}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl RemoteStorage for UnreliableWrapper {
|
impl RemoteStorage for UnreliableWrapper {
|
||||||
async fn list_prefixes(
|
async fn list_prefixes(
|
||||||
|
|||||||
@@ -1161,7 +1161,11 @@ impl RemoteTimelineClient {
|
|||||||
// at info level at first, and only WARN if the operation fails repeatedly.
|
// at info level at first, and only WARN if the operation fails repeatedly.
|
||||||
//
|
//
|
||||||
// (See similar logic for downloads in `download::download_retry`)
|
// (See similar logic for downloads in `download::download_retry`)
|
||||||
if retries < FAILED_UPLOAD_WARN_THRESHOLD {
|
|
||||||
|
let is_simulated = cfg!(feature = "testing")
|
||||||
|
&& e.root_cause().is::<remote_storage::SimulatedError>();
|
||||||
|
|
||||||
|
if retries < FAILED_UPLOAD_WARN_THRESHOLD || is_simulated {
|
||||||
info!(
|
info!(
|
||||||
"failed to perform remote task {}, will retry (attempt {}): {:#}",
|
"failed to perform remote task {}, will retry (attempt {}): {:#}",
|
||||||
task.op, retries, e
|
task.op, retries, e
|
||||||
|
|||||||
Reference in New Issue
Block a user