libs: make backoff::retry() take a cancellation token (#5065)

## Problem

Currently, anything that uses backoff::retry will delay the join of its
task by however long its backoff sleep is, multiplied by its max
retries.

Whenever we call a function that sleeps, we should be passing in a
CancellationToken.

## Summary of changes

- Add a `Cancel` type to backoff::retry that wraps a CancellationToken
and an error `Fn` to generate an error if the cancellation token fires.
- In call sites that already run in a `task_mgr` task, use
`shutdown_token()` to provide the token. In other locations, use a dead
`CancellationToken` to satisfy the interface, and leave a TODO to fix it
up when we broaden the use of explicit cancellation tokens.
This commit is contained in:
John Spray
2023-08-24 12:54:46 +01:00
committed by GitHub
parent d597e6d42b
commit 3e2f0ffb11
7 changed files with 83 additions and 15 deletions

View File

@@ -26,6 +26,7 @@ serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }

View File

@@ -1,18 +1,31 @@
use std::fmt::{Debug, Display};
use futures::Future;
use tokio_util::sync::CancellationToken;
pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
pub async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
pub async fn exponential_backoff(
n: u32,
base_increment: f64,
max_seconds: f64,
cancel: &CancellationToken,
) {
let backoff_duration_seconds =
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
if backoff_duration_seconds > 0.0 {
tracing::info!(
"Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
);
tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await;
drop(
tokio::time::timeout(
std::time::Duration::from_secs_f64(backoff_duration_seconds),
cancel.cancelled(),
)
.await,
)
}
}
@@ -24,28 +37,57 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
}
}
/// Configure cancellation for a retried operation: when to cancel (the token), and
/// what kind of error to return on cancellation
pub struct Cancel<E, CF>
where
E: Display + Debug + 'static,
CF: Fn() -> E,
{
token: CancellationToken,
on_cancel: CF,
}
impl<E, CF> Cancel<E, CF>
where
E: Display + Debug + 'static,
CF: Fn() -> E,
{
pub fn new(token: CancellationToken, on_cancel: CF) -> Self {
Self { token, on_cancel }
}
}
/// retries passed operation until one of the following conditions are met:
/// Encountered error is considered as permanent (non-retryable)
/// Retries have been exhausted.
/// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors
/// When attempts cross `warn_threshold` function starts to emit log warnings.
/// `description` argument is added to log messages. Its value should identify the `op` is doing
pub async fn retry<T, O, F, E>(
/// `cancel` argument is required: any time we are looping on retry, we should be using a CancellationToken
/// to drop out promptly on shutdown.
pub async fn retry<T, O, F, E, CF>(
mut op: O,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
cancel: Cancel<E, CF>,
) -> Result<T, E>
where
// Not std::error::Error because anyhow::Error doesnt implement it.
// For context see https://github.com/dtolnay/anyhow/issues/63
E: Display + Debug,
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
CF: Fn() -> E,
{
let mut attempts = 0;
loop {
if cancel.token.is_cancelled() {
return Err((cancel.on_cancel)());
}
let result = op().await;
match result {
Ok(_) => {
@@ -80,6 +122,7 @@ where
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&cancel.token,
)
.await;
attempts += 1;
@@ -132,6 +175,7 @@ mod tests {
1,
1,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
)
.await;
@@ -157,6 +201,7 @@ mod tests {
2,
2,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
)
.await
.unwrap();
@@ -179,6 +224,7 @@ mod tests {
2,
2,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
)
.await
.unwrap_err();