refactor: needless cancellation token cloning (#6618)

The solution we ended up for `backoff::retry` requires always cloning of
cancellation tokens even though there is just `.await`. Fix that, and
also turn the return type into `Option<Result<T, E>>` avoiding the need
for the `E::cancelled()` fn passed in.

Cc: #6096
This commit is contained in:
Joonas Koivunen
2024-02-06 09:39:06 +02:00
committed by GitHub
parent 8e114bd610
commit 947165788d
20 changed files with 156 additions and 154 deletions

View File

@@ -244,9 +244,11 @@ impl ComputeHook {
3, 3,
10, 10,
"Send compute notification", "Send compute notification",
backoff::Cancel::new(cancel.clone(), || NotifyError::ShuttingDown), cancel,
) )
.await .await
.ok_or_else(|| NotifyError::ShuttingDown)
.and_then(|x| x)
} }
/// Call this to notify the compute (postgres) tier of new pageservers to use /// Call this to notify the compute (postgres) tier of new pageservers to use

View File

@@ -379,7 +379,7 @@ impl RemoteStorage for AzureBlobStorage {
_prefix: Option<&RemotePath>, _prefix: Option<&RemotePath>,
_timestamp: SystemTime, _timestamp: SystemTime,
_done_if_after: SystemTime, _done_if_after: SystemTime,
_cancel: CancellationToken, _cancel: &CancellationToken,
) -> Result<(), TimeTravelError> { ) -> Result<(), TimeTravelError> {
// TODO use Azure point in time recovery feature for this // TODO use Azure point in time recovery feature for this
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview

View File

@@ -218,7 +218,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
prefix: Option<&RemotePath>, prefix: Option<&RemotePath>,
timestamp: SystemTime, timestamp: SystemTime,
done_if_after: SystemTime, done_if_after: SystemTime,
cancel: CancellationToken, cancel: &CancellationToken,
) -> Result<(), TimeTravelError>; ) -> Result<(), TimeTravelError>;
} }
@@ -442,7 +442,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
prefix: Option<&RemotePath>, prefix: Option<&RemotePath>,
timestamp: SystemTime, timestamp: SystemTime,
done_if_after: SystemTime, done_if_after: SystemTime,
cancel: CancellationToken, cancel: &CancellationToken,
) -> Result<(), TimeTravelError> { ) -> Result<(), TimeTravelError> {
match self { match self {
Self::LocalFs(s) => { Self::LocalFs(s) => {

View File

@@ -431,7 +431,7 @@ impl RemoteStorage for LocalFs {
_prefix: Option<&RemotePath>, _prefix: Option<&RemotePath>,
_timestamp: SystemTime, _timestamp: SystemTime,
_done_if_after: SystemTime, _done_if_after: SystemTime,
_cancel: CancellationToken, _cancel: &CancellationToken,
) -> Result<(), TimeTravelError> { ) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented) Err(TimeTravelError::Unimplemented)
} }

View File

@@ -638,7 +638,7 @@ impl RemoteStorage for S3Bucket {
prefix: Option<&RemotePath>, prefix: Option<&RemotePath>,
timestamp: SystemTime, timestamp: SystemTime,
done_if_after: SystemTime, done_if_after: SystemTime,
cancel: CancellationToken, cancel: &CancellationToken,
) -> Result<(), TimeTravelError> { ) -> Result<(), TimeTravelError> {
let kind = RequestKind::TimeTravel; let kind = RequestKind::TimeTravel;
let _guard = self.permit(kind).await; let _guard = self.permit(kind).await;
@@ -678,9 +678,11 @@ impl RemoteStorage for S3Bucket {
warn_threshold, warn_threshold,
max_retries, max_retries,
"listing object versions for time_travel_recover", "listing object versions for time_travel_recover",
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), cancel,
) )
.await?; .await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
tracing::trace!( tracing::trace!(
" Got List response version_id_marker={:?}, key_marker={:?}", " Got List response version_id_marker={:?}, key_marker={:?}",
@@ -805,9 +807,11 @@ impl RemoteStorage for S3Bucket {
warn_threshold, warn_threshold,
max_retries, max_retries,
"copying object version for time_travel_recover", "copying object version for time_travel_recover",
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), cancel,
) )
.await?; .await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
tracing::info!(%version_id, %key, "Copied old version in S3"); tracing::info!(%version_id, %key, "Copied old version in S3");
} }
VerOrDelete { VerOrDelete {

View File

@@ -190,7 +190,7 @@ impl RemoteStorage for UnreliableWrapper {
prefix: Option<&RemotePath>, prefix: Option<&RemotePath>,
timestamp: SystemTime, timestamp: SystemTime,
done_if_after: SystemTime, done_if_after: SystemTime,
cancel: CancellationToken, cancel: &CancellationToken,
) -> Result<(), TimeTravelError> { ) -> Result<(), TimeTravelError> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned()))) self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
.map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?; .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?;

View File

@@ -56,9 +56,10 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
warn_threshold, warn_threshold,
max_retries, max_retries,
"test retry", "test retry",
backoff::Cancel::new(CancellationToken::new(), || unreachable!()), &CancellationToken::new(),
) )
.await .await
.expect("never cancelled")
} }
async fn time_point() -> SystemTime { async fn time_point() -> SystemTime {
@@ -76,6 +77,8 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
.collect::<HashSet<_>>()) .collect::<HashSet<_>>())
} }
let cancel = CancellationToken::new();
let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?; .with_context(|| "RemotePath conversion")?;
@@ -142,7 +145,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// No changes after recovery to t2 (no-op) // No changes after recovery to t2 (no-op)
let t_final = time_point().await; let t_final = time_point().await;
ctx.client ctx.client
.time_travel_recover(None, t2, t_final, CancellationToken::new()) .time_travel_recover(None, t2, t_final, &cancel)
.await?; .await?;
let t2_files_recovered = list_files(&ctx.client).await?; let t2_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t2: {t2_files_recovered:?}"); println!("after recovery to t2: {t2_files_recovered:?}");
@@ -153,7 +156,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t1: path1 is back, path2 has the old content // after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await; let t_final = time_point().await;
ctx.client ctx.client
.time_travel_recover(None, t1, t_final, CancellationToken::new()) .time_travel_recover(None, t1, t_final, &cancel)
.await?; .await?;
let t1_files_recovered = list_files(&ctx.client).await?; let t1_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t1: {t1_files_recovered:?}"); println!("after recovery to t1: {t1_files_recovered:?}");
@@ -164,7 +167,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t0: everything is gone except for path1 // after recovery to t0: everything is gone except for path1
let t_final = time_point().await; let t_final = time_point().await;
ctx.client ctx.client
.time_travel_recover(None, t0, t_final, CancellationToken::new()) .time_travel_recover(None, t0, t_final, &cancel)
.await?; .await?;
let t0_files_recovered = list_files(&ctx.client).await?; let t0_files_recovered = list_files(&ctx.client).await?;
println!("after recovery to t0: {t0_files_recovered:?}"); println!("after recovery to t0: {t0_files_recovered:?}");

View File

@@ -37,69 +37,53 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec
} }
} }
/// Configure cancellation for a retried operation: when to cancel (the token), and /// Retries passed operation until one of the following conditions are met:
/// what kind of error to return on cancellation /// - encountered error is considered as permanent (non-retryable)
pub struct Cancel<E, CF> /// - retries have been exhausted
where /// - cancellation token has been cancelled
E: Display + Debug + 'static, ///
CF: Fn() -> E, /// `is_permanent` closure should be used to provide distinction between permanent/non-permanent
{ /// errors. When attempts cross `warn_threshold` function starts to emit log warnings.
token: CancellationToken,
on_cancel: CF,
}
impl<E, CF> Cancel<E, CF>
where
E: Display + Debug + 'static,
CF: Fn() -> E,
{
pub fn new(token: CancellationToken, on_cancel: CF) -> Self {
Self { token, on_cancel }
}
}
/// retries passed operation until one of the following conditions are met:
/// Encountered error is considered as permanent (non-retryable)
/// Retries have been exhausted.
/// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors
/// When attempts cross `warn_threshold` function starts to emit log warnings.
/// `description` argument is added to log messages. Its value should identify the `op` is doing /// `description` argument is added to log messages. Its value should identify the `op` is doing
/// `cancel` argument is required: any time we are looping on retry, we should be using a CancellationToken /// `cancel` cancels new attempts and the backoff sleep.
/// to drop out promptly on shutdown. ///
pub async fn retry<T, O, F, E, CF>( /// If attempts fail, they are being logged with `{:#}` which works for anyhow, but does not work
/// for any other error type. Final failed attempt is logged with `{:?}`.
///
/// Returns `None` if cancellation was noticed during backoff or the terminal result.
pub async fn retry<T, O, F, E>(
mut op: O, mut op: O,
is_permanent: impl Fn(&E) -> bool, is_permanent: impl Fn(&E) -> bool,
warn_threshold: u32, warn_threshold: u32,
max_retries: u32, max_retries: u32,
description: &str, description: &str,
cancel: Cancel<E, CF>, cancel: &CancellationToken,
) -> Result<T, E> ) -> Option<Result<T, E>>
where where
// Not std::error::Error because anyhow::Error doesnt implement it. // Not std::error::Error because anyhow::Error doesnt implement it.
// For context see https://github.com/dtolnay/anyhow/issues/63 // For context see https://github.com/dtolnay/anyhow/issues/63
E: Display + Debug + 'static, E: Display + Debug + 'static,
O: FnMut() -> F, O: FnMut() -> F,
F: Future<Output = Result<T, E>>, F: Future<Output = Result<T, E>>,
CF: Fn() -> E,
{ {
let mut attempts = 0; let mut attempts = 0;
loop { loop {
if cancel.token.is_cancelled() { if cancel.is_cancelled() {
return Err((cancel.on_cancel)()); return None;
} }
let result = op().await; let result = op().await;
match result { match &result {
Ok(_) => { Ok(_) => {
if attempts > 0 { if attempts > 0 {
tracing::info!("{description} succeeded after {attempts} retries"); tracing::info!("{description} succeeded after {attempts} retries");
} }
return result; return Some(result);
} }
// These are "permanent" errors that should not be retried. // These are "permanent" errors that should not be retried.
Err(ref e) if is_permanent(e) => { Err(e) if is_permanent(e) => {
return result; return Some(result);
} }
// Assume that any other failure might be transient, and the operation might // Assume that any other failure might be transient, and the operation might
// succeed if we just keep trying. // succeed if we just keep trying.
@@ -109,12 +93,12 @@ where
Err(err) if attempts < max_retries => { Err(err) if attempts < max_retries => {
tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}"); tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}");
} }
Err(ref err) => { Err(err) => {
// Operation failed `max_attempts` times. Time to give up. // Operation failed `max_attempts` times. Time to give up.
tracing::warn!( tracing::warn!(
"{description} still failed after {attempts} retries, giving up: {err:?}" "{description} still failed after {attempts} retries, giving up: {err:?}"
); );
return result; return Some(result);
} }
} }
// sleep and retry // sleep and retry
@@ -122,7 +106,7 @@ where
attempts, attempts,
DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_BASE_BACKOFF_SECONDS,
DEFAULT_MAX_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
&cancel.token, cancel,
) )
.await; .await;
attempts += 1; attempts += 1;
@@ -131,11 +115,9 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io;
use tokio::sync::Mutex;
use super::*; use super::*;
use std::io;
use tokio::sync::Mutex;
#[test] #[test]
fn backoff_defaults_produce_growing_backoff_sequence() { fn backoff_defaults_produce_growing_backoff_sequence() {
@@ -166,7 +148,7 @@ mod tests {
#[tokio::test(start_paused = true)] #[tokio::test(start_paused = true)]
async fn retry_always_error() { async fn retry_always_error() {
let count = Mutex::new(0); let count = Mutex::new(0);
let err_result = retry( retry(
|| async { || async {
*count.lock().await += 1; *count.lock().await += 1;
Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other)) Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other))
@@ -175,11 +157,11 @@ mod tests {
1, 1,
1, 1,
"work", "work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), &CancellationToken::new(),
) )
.await; .await
.expect("not cancelled")
assert!(err_result.is_err()); .expect_err("it can only fail");
assert_eq!(*count.lock().await, 2); assert_eq!(*count.lock().await, 2);
} }
@@ -201,10 +183,11 @@ mod tests {
2, 2,
2, 2,
"work", "work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), &CancellationToken::new(),
) )
.await .await
.unwrap(); .expect("not cancelled")
.expect("success on second try");
} }
#[tokio::test(start_paused = true)] #[tokio::test(start_paused = true)]
@@ -224,10 +207,11 @@ mod tests {
2, 2,
2, 2,
"work", "work",
Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), &CancellationToken::new(),
) )
.await .await
.unwrap_err(); .expect("was not cancellation")
.expect_err("it was permanent error");
assert_eq!(*count.lock().await, 1); assert_eq!(*count.lock().await, 1);
} }

View File

@@ -262,35 +262,33 @@ async fn upload(
) -> Result<(), UploadError> { ) -> Result<(), UploadError> {
let warn_after = 3; let warn_after = 3;
let max_attempts = 10; let max_attempts = 10;
// this is used only with tests so far
let last_value = if is_last { "true" } else { "false" };
let res = utils::backoff::retry( let res = utils::backoff::retry(
move || { || async {
let body = body.clone(); let res = client
async move { .post(metric_collection_endpoint.clone())
let res = client .header(reqwest::header::CONTENT_TYPE, "application/json")
.post(metric_collection_endpoint.clone()) .header(LAST_IN_BATCH.clone(), last_value)
.header(reqwest::header::CONTENT_TYPE, "application/json") .body(body.clone())
.header( .send()
LAST_IN_BATCH.clone(), .await;
if is_last { "true" } else { "false" },
)
.body(body)
.send()
.await;
let res = res.and_then(|res| res.error_for_status()); let res = res.and_then(|res| res.error_for_status());
// 10 redirects are normally allowed, so we don't need worry about 3xx // 10 redirects are normally allowed, so we don't need worry about 3xx
match res { match res {
Ok(_response) => Ok(()), Ok(_response) => Ok(()),
Err(e) => { Err(e) => {
let status = e.status().filter(|s| s.is_client_error()); let status = e.status().filter(|s| s.is_client_error());
if let Some(status) = status { if let Some(status) = status {
// rejection used to be a thing when the server could reject a // rejection used to be a thing when the server could reject a
// whole batch of metrics if one metric was bad. // whole batch of metrics if one metric was bad.
Err(UploadError::Rejected(status)) Err(UploadError::Rejected(status))
} else { } else {
Err(UploadError::Reqwest(e)) Err(UploadError::Reqwest(e))
}
} }
} }
} }
@@ -299,9 +297,11 @@ async fn upload(
warn_after, warn_after,
max_attempts, max_attempts,
"upload consumption_metrics", "upload consumption_metrics",
utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled), cancel,
) )
.await; .await
.ok_or_else(|| UploadError::Cancelled)
.and_then(|x| x);
match &res { match &res {
Ok(_) => {} Ok(_) => {}

View File

@@ -82,46 +82,29 @@ impl ControlPlaneClient {
R: Serialize, R: Serialize,
T: DeserializeOwned, T: DeserializeOwned,
{ {
#[derive(thiserror::Error, Debug)] let res = backoff::retry(
enum RemoteAttemptError {
#[error("shutdown")]
Shutdown,
#[error("remote: {0}")]
Remote(reqwest::Error),
}
match backoff::retry(
|| async { || async {
let response = self let response = self
.http_client .http_client
.post(url.clone()) .post(url.clone())
.json(&request) .json(&request)
.send() .send()
.await .await?;
.map_err(RemoteAttemptError::Remote)?;
response response.error_for_status_ref()?;
.error_for_status_ref() response.json::<T>().await
.map_err(RemoteAttemptError::Remote)?;
response
.json::<T>()
.await
.map_err(RemoteAttemptError::Remote)
}, },
|_| false, |_| false,
3, 3,
u32::MAX, u32::MAX,
"calling control plane generation validation API", "calling control plane generation validation API",
backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown), &self.cancel,
) )
.await .await
{ .ok_or(RetryForeverError::ShuttingDown)?
Err(RemoteAttemptError::Shutdown) => Err(RetryForeverError::ShuttingDown), .expect("We retry forever, this should never be reached");
Err(RemoteAttemptError::Remote(_)) => {
panic!("We retry forever, this should never be reached"); Ok(res)
}
Ok(r) => Ok(r),
}
} }
} }

View File

@@ -77,9 +77,11 @@ impl Deleter {
3, 3,
10, 10,
"executing deletion batch", "executing deletion batch",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")), &self.cancel,
) )
.await .await
.ok_or_else(|| anyhow::anyhow!("Shutting down"))
.and_then(|x| x)
} }
/// Block until everything in accumulator has been executed /// Block until everything in accumulator has been executed

View File

@@ -3294,11 +3294,11 @@ impl Tenant {
3, 3,
u32::MAX, u32::MAX,
"persist_initdb_tar_zst", "persist_initdb_tar_zst",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), &self.cancel,
) )
.await?; .await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
Ok(()) .and_then(|x| x)
} }
/// - run initdb to init temporary instance and get bootstrap data /// - run initdb to init temporary instance and get bootstrap data

View File

@@ -91,9 +91,11 @@ async fn create_remote_delete_mark(
FAILED_UPLOAD_WARN_THRESHOLD, FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES, FAILED_REMOTE_OP_RETRIES,
"mark_upload", "mark_upload",
backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), cancel,
) )
.await .await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.and_then(|x| x)
.context("mark_upload")?; .context("mark_upload")?;
Ok(()) Ok(())
@@ -187,9 +189,11 @@ async fn remove_tenant_remote_delete_mark(
FAILED_UPLOAD_WARN_THRESHOLD, FAILED_UPLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES, FAILED_REMOTE_OP_RETRIES,
"remove_tenant_remote_delete_mark", "remove_tenant_remote_delete_mark",
backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), cancel,
) )
.await .await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.and_then(|x| x)
.context("remove_tenant_remote_delete_mark")?; .context("remove_tenant_remote_delete_mark")?;
} }
Ok(()) Ok(())

View File

@@ -1046,9 +1046,11 @@ impl RemoteTimelineClient {
// when executed as part of tenant deletion this happens in the background // when executed as part of tenant deletion this happens in the background
2, 2,
"persist_index_part_with_deleted_flag", "persist_index_part_with_deleted_flag",
backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), &self.cancel,
) )
.await?; .await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.and_then(|x| x)?;
// all good, disarm the guard and mark as success // all good, disarm the guard and mark as success
ScopeGuard::into_inner(undo_deleted_at); ScopeGuard::into_inner(undo_deleted_at);
@@ -1083,9 +1085,11 @@ impl RemoteTimelineClient {
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES, FAILED_REMOTE_OP_RETRIES,
"preserve_initdb_tar_zst", "preserve_initdb_tar_zst",
backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled!")), &cancel.clone(),
) )
.await .await
.ok_or_else(|| anyhow::anyhow!("Cancellled"))
.and_then(|x| x)
.context("backing up initdb archive")?; .context("backing up initdb archive")?;
Ok(()) Ok(())
} }
@@ -1141,6 +1145,8 @@ impl RemoteTimelineClient {
// taking the burden of listing all the layers that we already know we should delete. // taking the burden of listing all the layers that we already know we should delete.
self.deletion_queue_client.flush_immediate().await?; self.deletion_queue_client.flush_immediate().await?;
let cancel = shutdown_token();
let remaining = backoff::retry( let remaining = backoff::retry(
|| async { || async {
self.storage_impl self.storage_impl
@@ -1151,9 +1157,11 @@ impl RemoteTimelineClient {
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES, FAILED_REMOTE_OP_RETRIES,
"list_prefixes", "list_prefixes",
backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")), &cancel,
) )
.await .await
.ok_or_else(|| anyhow::anyhow!("Cancelled!"))
.and_then(|x| x)
.context("list prefixes")?; .context("list prefixes")?;
// We will delete the current index_part object last, since it acts as a deletion // We will delete the current index_part object last, since it acts as a deletion

View File

@@ -76,7 +76,6 @@ pub async fn download_layer_file<'a>(
// If pageserver crashes the temp file will be deleted on startup and re-downloaded. // If pageserver crashes the temp file will be deleted on startup and re-downloaded.
let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION); let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION);
let cancel_inner = cancel.clone();
let (mut destination_file, bytes_amount) = download_retry( let (mut destination_file, bytes_amount) = download_retry(
|| async { || async {
let destination_file = tokio::fs::File::create(&temp_file_path) let destination_file = tokio::fs::File::create(&temp_file_path)
@@ -87,7 +86,7 @@ pub async fn download_layer_file<'a>(
// Cancellation safety: it is safe to cancel this future, because it isn't writing to a local // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local
// file: the write to local file doesn't start until after the request header is returned // file: the write to local file doesn't start until after the request header is returned
// and we start draining the body stream below // and we start draining the body stream below
let download = download_cancellable(&cancel_inner, storage.download(&remote_path)) let download = download_cancellable(cancel, storage.download(&remote_path))
.await .await
.with_context(|| { .with_context(|| {
format!( format!(
@@ -107,7 +106,7 @@ pub async fn download_layer_file<'a>(
// we will imminiently try and write to again. // we will imminiently try and write to again.
let bytes_amount: u64 = match timeout_cancellable( let bytes_amount: u64 = match timeout_cancellable(
DOWNLOAD_TIMEOUT, DOWNLOAD_TIMEOUT,
&cancel_inner, cancel,
tokio::io::copy_buf(&mut reader, &mut destination_file), tokio::io::copy_buf(&mut reader, &mut destination_file),
) )
.await .await
@@ -386,9 +385,11 @@ pub(super) async fn download_index_part(
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES, FAILED_REMOTE_OP_RETRIES,
"listing index_part files", "listing index_part files",
backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), &cancel,
) )
.await .await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.and_then(|x| x)
.map_err(DownloadError::Other)?; .map_err(DownloadError::Other)?;
// General case logic for which index to use: the latest index whose generation // General case logic for which index to use: the latest index whose generation
@@ -510,7 +511,7 @@ pub(crate) async fn download_initdb_tar_zst(
/// Helper function to handle retries for a download operation. /// Helper function to handle retries for a download operation.
/// ///
/// Remote operations can fail due to rate limits (IAM, S3), spurious network /// Remote operations can fail due to rate limits (S3), spurious network
/// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times, /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times,
/// with backoff. /// with backoff.
/// ///
@@ -530,9 +531,11 @@ where
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES, FAILED_REMOTE_OP_RETRIES,
description, description,
backoff::Cancel::new(cancel.clone(), || DownloadError::Cancelled), cancel,
) )
.await .await
.ok_or_else(|| DownloadError::Cancelled)
.and_then(|x| x)
} }
async fn download_retry_forever<T, O, F>( async fn download_retry_forever<T, O, F>(
@@ -550,7 +553,9 @@ where
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_DOWNLOAD_WARN_THRESHOLD,
u32::MAX, u32::MAX,
description, description,
backoff::Cancel::new(cancel, || DownloadError::Cancelled), &cancel,
) )
.await .await
.ok_or_else(|| DownloadError::Cancelled)
.and_then(|x| x)
} }

View File

@@ -188,16 +188,18 @@ pub(crate) async fn time_travel_recover_tenant(
backoff::retry( backoff::retry(
|| async { || async {
storage storage
.time_travel_recover(Some(prefix), timestamp, done_if_after, cancel.clone()) .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
.await .await
}, },
|e| !matches!(e, TimeTravelError::Other(_)), |e| !matches!(e, TimeTravelError::Other(_)),
warn_after, warn_after,
max_attempts, max_attempts,
"time travel recovery of tenant prefix", "time travel recovery of tenant prefix",
backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), cancel,
) )
.await?; .await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
} }
Ok(()) Ok(())
} }

View File

@@ -537,11 +537,11 @@ impl<'a> TenantDownloader<'a> {
FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_DOWNLOAD_WARN_THRESHOLD,
FAILED_REMOTE_OP_RETRIES, FAILED_REMOTE_OP_RETRIES,
"download heatmap", "download heatmap",
backoff::Cancel::new(self.secondary_state.cancel.clone(), || { &self.secondary_state.cancel,
UpdateError::Cancelled
}),
) )
.await?; .await
.ok_or_else(|| UpdateError::Cancelled)
.and_then(|x| x)?;
SECONDARY_MODE.download_heatmap.inc(); SECONDARY_MODE.download_heatmap.inc();

View File

@@ -426,9 +426,11 @@ async fn upload_tenant_heatmap(
3, 3,
u32::MAX, u32::MAX,
"Uploading heatmap", "Uploading heatmap",
backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")), &tenant_cancel,
) )
.await .await
.ok_or_else(|| anyhow::anyhow!("Shutting down"))
.and_then(|x| x)
{ {
if tenant_cancel.is_cancelled() { if tenant_cancel.is_cancelled() {
return Err(UploadHeatmapError::Cancelled); return Err(UploadHeatmapError::Cancelled);

View File

@@ -315,9 +315,11 @@ async fn upload_parquet(
FAILED_UPLOAD_MAX_RETRIES, FAILED_UPLOAD_MAX_RETRIES,
"request_data_upload", "request_data_upload",
// we don't want cancellation to interrupt here, so we make a dummy cancel token // we don't want cancellation to interrupt here, so we make a dummy cancel token
backoff::Cancel::new(CancellationToken::new(), || anyhow::anyhow!("Cancelled")), &CancellationToken::new(),
) )
.await .await
.ok_or_else(|| anyhow::anyhow!("Cancelled"))
.and_then(|x| x)
.context("request_data_upload")?; .context("request_data_upload")?;
Ok(buffer.writer()) Ok(buffer.writer())

View File

@@ -558,16 +558,17 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> {
backoff::retry( backoff::retry(
|| async { || async {
let files = storage.list_files(Some(&remote_path)).await?; let files = storage.list_files(Some(&remote_path)).await?;
storage.delete_objects(&files).await?; storage.delete_objects(&files).await
Ok(())
}, },
|_| false, |_| false,
3, 3,
10, 10,
"executing WAL segments deletion batch", "executing WAL segments deletion batch",
backoff::Cancel::new(token, || anyhow::anyhow!("canceled")), &token,
) )
.await?; .await
.ok_or_else(|| anyhow::anyhow!("canceled"))
.and_then(|x| x)?;
Ok(()) Ok(())
} }