Compare commits

...

4 Commits

Author SHA1 Message Date
Alex Chi Z
406d8ee7d9 fix tests
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-13 15:09:33 -05:00
Alex Chi Z
f59e8be7fd fix clippy warnings
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-13 14:48:00 -05:00
Alex Chi Z
be9db748d9 jitter
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-13 14:30:26 -05:00
Alex Chi Z
2b679b100b fix(pageserver): make backoff long enough
Signed-off-by: Alex Chi Z <chi@neon.tech>
2024-11-13 14:24:53 -05:00
3 changed files with 51 additions and 14 deletions

View File

@@ -1,11 +1,15 @@
use std::fmt::{Debug, Display};
use futures::Future;
use rand::Rng;
use tokio_util::sync::CancellationToken;
pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
pub const DEFAULT_NETWORK_BASE_BACKOFF_SECONDS: f64 = 1.5;
pub const DEFAULT_NETWORK_MAX_BACKOFF_SECONDS: f64 = 60.0;
pub async fn exponential_backoff(
n: u32,
base_increment: f64,
@@ -32,11 +36,41 @@ pub async fn exponential_backoff(
pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_seconds: f64) -> f64 {
if n == 0 {
0.0
} else if base_increment == 0.0 {
1.0
} else {
(1.0 + base_increment).powf(f64::from(n)).min(max_seconds)
((1.0 + base_increment).powf(f64::from(n))
+ rand::thread_rng().gen_range(0.0..base_increment))
.min(max_seconds)
}
}
pub async fn retry<T, O, F, E>(
op: O,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
cancel: &CancellationToken,
) -> Option<Result<T, E>>
where
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{
retry_with_options(
op,
is_permanent,
warn_threshold,
max_retries,
description,
cancel,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
)
.await
}
/// Retries passed operation until one of the following conditions are met:
/// - encountered error is considered as permanent (non-retryable)
/// - retries have been exhausted
@@ -51,13 +85,16 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
/// for any other error type. Final failed attempt is logged with `{:?}`.
///
/// Returns `None` if cancellation was noticed during backoff or the terminal result.
pub async fn retry<T, O, F, E>(
#[allow(clippy::too_many_arguments)]
pub async fn retry_with_options<T, O, F, E>(
mut op: O,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
cancel: &CancellationToken,
base_increment: f64,
max_seconds: f64,
) -> Option<Result<T, E>>
where
// Not std::error::Error because anyhow::Error doesnt implement it.
@@ -102,13 +139,7 @@ where
}
}
// sleep and retry
exponential_backoff(
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
cancel,
)
.await;
exponential_backoff(attempts, base_increment, max_seconds, cancel).await;
attempts += 1;
}
}
@@ -132,7 +163,8 @@ mod tests {
if let Some(old_backoff_value) = current_backoff_value.replace(new_backoff_value) {
assert!(
old_backoff_value <= new_backoff_value,
// accommodate the randomness of the backoff
old_backoff_value - DEFAULT_BASE_BACKOFF_SECONDS <= new_backoff_value,
"{i}th backoff value {new_backoff_value} is smaller than the previous one {old_backoff_value}"
)
}

View File

@@ -693,13 +693,15 @@ where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
backoff::retry(
backoff::retry_with_options(
op,
DownloadError::is_permanent,
FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
description,
cancel,
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
)
.await
.ok_or_else(|| DownloadError::Cancelled)
@@ -715,13 +717,15 @@ where
O: FnMut() -> F,
F: Future<Output = Result<T, DownloadError>>,
{
backoff::retry(
backoff::retry_with_options(
op,
DownloadError::is_permanent,
FAILED_DOWNLOAD_WARN_THRESHOLD,
u32::MAX,
description,
cancel,
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
)
.await
.ok_or_else(|| DownloadError::Cancelled)

View File

@@ -8,6 +8,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use tracing::Instrument;
use utils::backoff;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::sync::{gate, heavier_once_cell};
@@ -1204,8 +1205,8 @@ impl LayerInner {
let backoff = utils::backoff::exponential_backoff_duration_seconds(
consecutive_failures.min(u32::MAX as usize) as u32,
1.5,
60.0,
backoff::DEFAULT_NETWORK_BASE_BACKOFF_SECONDS,
backoff::DEFAULT_NETWORK_MAX_BACKOFF_SECONDS,
);
let backoff = std::time::Duration::from_secs_f64(backoff);