Revert "libs: make backoff::retry() take a cancellation token"

This reverts commit 8c2ff87f1a.
This commit is contained in:
John Spray
2023-08-30 10:26:15 +01:00
parent 5a217791fd
commit 2f58f39648
7 changed files with 7 additions and 71 deletions

1
Cargo.lock generated
View File

@@ -4915,7 +4915,6 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tracing",
"tracing-error",
"tracing-subscriber",

View File

@@ -26,7 +26,6 @@ serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-error.workspace = true
tracing-subscriber = { workspace = true, features = ["json", "registry"] }

View File

@@ -1,27 +1,18 @@
use std::fmt::{Debug, Display};
use futures::Future;
use tokio_util::sync::CancellationToken;
pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1;
pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0;
pub async fn exponential_backoff(
n: u32,
base_increment: f64,
max_seconds: f64,
cancel: &CancellationToken,
) {
pub async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) {
let backoff_duration_seconds =
exponential_backoff_duration_seconds(n, base_increment, max_seconds);
if backoff_duration_seconds > 0.0 {
tracing::info!(
"Backoff: waiting {backoff_duration_seconds} seconds before processing with the task",
);
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)) => {},
_ = cancel.cancelled() => {}
}
tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await;
}
}
@@ -33,57 +24,28 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
}
}
/// Configure cancellation for a retried operation: when to cancel (the token), and
/// what kind of error to return on cancellation
pub struct Cancel<E, CF>
where
E: Display + Debug + 'static,
CF: Fn() -> E,
{
token: CancellationToken,
on_cancel: CF,
}
impl<E, CF> Cancel<E, CF>
where
E: Display + Debug + 'static,
CF: Fn() -> E,
{
pub fn new(token: CancellationToken, on_cancel: CF) -> Self {
Self { token, on_cancel }
}
}
/// retries passed operation until one of the following conditions are met:
/// Encountered error is considered as permanent (non-retryable)
/// Retries have been exhausted.
/// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors
/// When attempts cross `warn_threshold` function starts to emit log warnings.
/// `description` argument is added to log messages. Its value should identify the `op` is doing
/// `cancel` argument is required: any time we are looping on retry, we should be using a CancellationToken
/// to drop out promptly on shutdown.
pub async fn retry<T, O, F, E, CF>(
pub async fn retry<T, O, F, E>(
mut op: O,
is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32,
max_retries: u32,
description: &str,
cancel: Cancel<E, CF>,
) -> Result<T, E>
where
// Not std::error::Error because anyhow::Error doesnt implement it.
// For context see https://github.com/dtolnay/anyhow/issues/63
E: Display + Debug + 'static,
E: Display + Debug,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
CF: Fn() -> E,
{
let mut attempts = 0;
loop {
if cancel.token.is_cancelled() {
return Err((cancel.on_cancel)());
}
let result = op().await;
match result {
Ok(_) => {
@@ -118,7 +80,6 @@ where
attempts,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&cancel.token,
)
.await;
attempts += 1;
@@ -171,7 +132,6 @@ mod tests {
1,
1,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
)
.await;
@@ -197,7 +157,6 @@ mod tests {
2,
2,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
)
.await
.unwrap();
@@ -220,7 +179,6 @@ mod tests {
2,
2,
"work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }),
)
.await
.unwrap_err();

View File

@@ -7,7 +7,6 @@ use anyhow::Context;
use pageserver_api::models::TenantState;
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use tokio::sync::OwnedMutexGuard;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, instrument, warn, Instrument, Span};
use utils::{
@@ -83,8 +82,6 @@ async fn create_remote_delete_mark(
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"mark_upload",
// TODO: thread a cancellation token into this code path
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await
.context("mark_upload")?;
@@ -174,8 +171,6 @@ async fn remove_tenant_remote_delete_mark(
FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark",
// TODO: thread a cancellation token into this code path
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await
.context("remove_tenant_remote_delete_mark")?;
@@ -257,8 +252,6 @@ pub(crate) async fn remote_delete_mark_exists(
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS,
"fetch_tenant_deletion_mark",
// TODO: thread a cancellation token into this code path
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await;

View File

@@ -214,7 +214,6 @@ use chrono::{NaiveDateTime, Utc};
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
@@ -236,7 +235,6 @@ use crate::metrics::{
RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES,
REMOTE_ONDEMAND_DOWNLOADED_LAYERS,
};
use crate::task_mgr::shutdown_token;
use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::{
@@ -754,7 +752,7 @@ impl RemoteTimelineClient {
pausable_failpoint!("persist_deleted_index_part");
backoff::retry(
|| {
|| async {
upload::upload_index_part(
self.conf,
&self.storage_impl,
@@ -762,6 +760,7 @@ impl RemoteTimelineClient {
&self.timeline_id,
&index_part_with_deleted_at,
)
.await
},
|_e| false,
1,
@@ -770,7 +769,6 @@ impl RemoteTimelineClient {
// when executed as part of tenant deletion this happens in the background
2,
"persist_index_part_with_deleted_flag",
backoff::Cancel::new(CancellationToken::new(), || unreachable!()),
)
.await?;
@@ -854,7 +852,6 @@ impl RemoteTimelineClient {
FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
"list_prefixes",
backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")),
)
.await
.context("list prefixes")?;
@@ -1100,14 +1097,12 @@ impl RemoteTimelineClient {
}
// sleep until it's time to retry, or we're cancelled
let cancel = shutdown_token();
tokio::select! {
_ = task_mgr::shutdown_watcher() => { },
_ = exponential_backoff(
retries,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&cancel
) => { },
};
}

View File

@@ -11,7 +11,6 @@ use std::time::Duration;
use anyhow::{anyhow, Context};
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
use utils::{backoff, crashsafe};
use crate::config::PageServerConf;
@@ -267,10 +266,6 @@ where
FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES,
description,
// TODO: pass a cancellation token into this function
backoff::Cancel::new(CancellationToken::new(), || -> DownloadError {
unreachable!()
}),
)
.await
}

View File

@@ -17,7 +17,7 @@ use crate::metrics::{
WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED,
WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES,
};
use crate::task_mgr::{shutdown_token, TaskKind};
use crate::task_mgr::TaskKind;
use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline};
use anyhow::Context;
use chrono::{NaiveDateTime, Utc};
@@ -211,14 +211,11 @@ async fn subscribe_for_timeline_updates(
id: TenantTimelineId,
) -> Streaming<SafekeeperTimelineInfo> {
let mut attempt = 0;
let cancel = shutdown_token();
loop {
exponential_backoff(
attempt,
DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS,
&cancel,
)
.await;
attempt += 1;