diff --git a/control_plane/attachment_service/src/compute_hook.rs b/control_plane/attachment_service/src/compute_hook.rs index 9c1185f259..4ca26431ca 100644 --- a/control_plane/attachment_service/src/compute_hook.rs +++ b/control_plane/attachment_service/src/compute_hook.rs @@ -244,9 +244,11 @@ impl ComputeHook { 3, 10, "Send compute notification", - backoff::Cancel::new(cancel.clone(), || NotifyError::ShuttingDown), + cancel, ) .await + .ok_or_else(|| NotifyError::ShuttingDown) + .and_then(|x| x) } /// Call this to notify the compute (postgres) tier of new pageservers to use diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index 57c57a2b70..c6d5224706 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -379,7 +379,7 @@ impl RemoteStorage for AzureBlobStorage { _prefix: Option<&RemotePath>, _timestamp: SystemTime, _done_if_after: SystemTime, - _cancel: CancellationToken, + _cancel: &CancellationToken, ) -> Result<(), TimeTravelError> { // TODO use Azure point in time recovery feature for this // https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 4aeaee70b1..e64b1de6f9 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -218,7 +218,7 @@ pub trait RemoteStorage: Send + Sync + 'static { prefix: Option<&RemotePath>, timestamp: SystemTime, done_if_after: SystemTime, - cancel: CancellationToken, + cancel: &CancellationToken, ) -> Result<(), TimeTravelError>; } @@ -442,7 +442,7 @@ impl GenericRemoteStorage> { prefix: Option<&RemotePath>, timestamp: SystemTime, done_if_after: SystemTime, - cancel: CancellationToken, + cancel: &CancellationToken, ) -> Result<(), TimeTravelError> { match self { Self::LocalFs(s) => { diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index d47fa75b37..36ec15e1b1 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -431,7 +431,7 @@ impl RemoteStorage for LocalFs { _prefix: Option<&RemotePath>, _timestamp: SystemTime, _done_if_after: SystemTime, - _cancel: CancellationToken, + _cancel: &CancellationToken, ) -> Result<(), TimeTravelError> { Err(TimeTravelError::Unimplemented) } diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 4d6564cba6..c9ad9ef225 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -638,7 +638,7 @@ impl RemoteStorage for S3Bucket { prefix: Option<&RemotePath>, timestamp: SystemTime, done_if_after: SystemTime, - cancel: CancellationToken, + cancel: &CancellationToken, ) -> Result<(), TimeTravelError> { let kind = RequestKind::TimeTravel; let _guard = self.permit(kind).await; @@ -678,9 +678,11 @@ impl RemoteStorage for S3Bucket { warn_threshold, max_retries, "listing object versions for time_travel_recover", - backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), + cancel, ) - .await?; + .await + .ok_or_else(|| TimeTravelError::Cancelled) + .and_then(|x| x)?; tracing::trace!( " Got List response version_id_marker={:?}, key_marker={:?}", @@ -805,9 +807,11 @@ impl RemoteStorage for S3Bucket { warn_threshold, max_retries, "copying object version for time_travel_recover", - backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), + cancel, ) - .await?; + .await + .ok_or_else(|| TimeTravelError::Cancelled) + .and_then(|x| x)?; tracing::info!(%version_id, %key, "Copied old version in S3"); } VerOrDelete { diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index ee9792232a..82d5a61fda 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -190,7 +190,7 @@ impl RemoteStorage for UnreliableWrapper { prefix: Option<&RemotePath>, timestamp: SystemTime, done_if_after: SystemTime, - cancel: CancellationToken, + cancel: &CancellationToken, ) -> Result<(), TimeTravelError> { self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned()))) .map_err(|e| TimeTravelError::Other(anyhow::Error::new(e)))?; diff --git a/libs/remote_storage/tests/test_real_s3.rs b/libs/remote_storage/tests/test_real_s3.rs index 679be66bf7..fc52dabc36 100644 --- a/libs/remote_storage/tests/test_real_s3.rs +++ b/libs/remote_storage/tests/test_real_s3.rs @@ -56,9 +56,10 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: warn_threshold, max_retries, "test retry", - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), + &CancellationToken::new(), ) .await + .expect("never cancelled") } async fn time_point() -> SystemTime { @@ -76,6 +77,8 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: .collect::>()) } + let cancel = CancellationToken::new(); + let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) .with_context(|| "RemotePath conversion")?; @@ -142,7 +145,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: // No changes after recovery to t2 (no-op) let t_final = time_point().await; ctx.client - .time_travel_recover(None, t2, t_final, CancellationToken::new()) + .time_travel_recover(None, t2, t_final, &cancel) .await?; let t2_files_recovered = list_files(&ctx.client).await?; println!("after recovery to t2: {t2_files_recovered:?}"); @@ -153,7 +156,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: // after recovery to t1: path1 is back, path2 has the old content let t_final = time_point().await; ctx.client - .time_travel_recover(None, t1, t_final, CancellationToken::new()) + .time_travel_recover(None, t1, t_final, &cancel) .await?; let t1_files_recovered = list_files(&ctx.client).await?; println!("after recovery to t1: {t1_files_recovered:?}"); @@ -164,7 +167,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow: // after recovery to t0: everything is gone except for path1 let t_final = time_point().await; ctx.client - .time_travel_recover(None, t0, t_final, CancellationToken::new()) + .time_travel_recover(None, t0, t_final, &cancel) .await?; let t0_files_recovered = list_files(&ctx.client).await?; println!("after recovery to t0: {t0_files_recovered:?}"); diff --git a/libs/utils/src/backoff.rs b/libs/utils/src/backoff.rs index d50ad39585..096c7e5854 100644 --- a/libs/utils/src/backoff.rs +++ b/libs/utils/src/backoff.rs @@ -37,69 +37,53 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec } } -/// Configure cancellation for a retried operation: when to cancel (the token), and -/// what kind of error to return on cancellation -pub struct Cancel -where - E: Display + Debug + 'static, - CF: Fn() -> E, -{ - token: CancellationToken, - on_cancel: CF, -} - -impl Cancel -where - E: Display + Debug + 'static, - CF: Fn() -> E, -{ - pub fn new(token: CancellationToken, on_cancel: CF) -> Self { - Self { token, on_cancel } - } -} - -/// retries passed operation until one of the following conditions are met: -/// Encountered error is considered as permanent (non-retryable) -/// Retries have been exhausted. -/// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors -/// When attempts cross `warn_threshold` function starts to emit log warnings. +/// Retries passed operation until one of the following conditions are met: +/// - encountered error is considered as permanent (non-retryable) +/// - retries have been exhausted +/// - cancellation token has been cancelled +/// +/// `is_permanent` closure should be used to provide distinction between permanent/non-permanent +/// errors. When attempts cross `warn_threshold` function starts to emit log warnings. /// `description` argument is added to log messages. Its value should identify the `op` is doing -/// `cancel` argument is required: any time we are looping on retry, we should be using a CancellationToken -/// to drop out promptly on shutdown. -pub async fn retry( +/// `cancel` cancels new attempts and the backoff sleep. +/// +/// If attempts fail, they are being logged with `{:#}` which works for anyhow, but does not work +/// for any other error type. Final failed attempt is logged with `{:?}`. +/// +/// Returns `None` if cancellation was noticed during backoff or the terminal result. +pub async fn retry( mut op: O, is_permanent: impl Fn(&E) -> bool, warn_threshold: u32, max_retries: u32, description: &str, - cancel: Cancel, -) -> Result + cancel: &CancellationToken, +) -> Option> where // Not std::error::Error because anyhow::Error doesnt implement it. // For context see https://github.com/dtolnay/anyhow/issues/63 E: Display + Debug + 'static, O: FnMut() -> F, F: Future>, - CF: Fn() -> E, { let mut attempts = 0; loop { - if cancel.token.is_cancelled() { - return Err((cancel.on_cancel)()); + if cancel.is_cancelled() { + return None; } let result = op().await; - match result { + match &result { Ok(_) => { if attempts > 0 { tracing::info!("{description} succeeded after {attempts} retries"); } - return result; + return Some(result); } // These are "permanent" errors that should not be retried. - Err(ref e) if is_permanent(e) => { - return result; + Err(e) if is_permanent(e) => { + return Some(result); } // Assume that any other failure might be transient, and the operation might // succeed if we just keep trying. @@ -109,12 +93,12 @@ where Err(err) if attempts < max_retries => { tracing::warn!("{description} failed, will retry (attempt {attempts}): {err:#}"); } - Err(ref err) => { + Err(err) => { // Operation failed `max_attempts` times. Time to give up. tracing::warn!( "{description} still failed after {attempts} retries, giving up: {err:?}" ); - return result; + return Some(result); } } // sleep and retry @@ -122,7 +106,7 @@ where attempts, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, - &cancel.token, + cancel, ) .await; attempts += 1; @@ -131,11 +115,9 @@ where #[cfg(test)] mod tests { - use std::io; - - use tokio::sync::Mutex; - use super::*; + use std::io; + use tokio::sync::Mutex; #[test] fn backoff_defaults_produce_growing_backoff_sequence() { @@ -166,7 +148,7 @@ mod tests { #[tokio::test(start_paused = true)] async fn retry_always_error() { let count = Mutex::new(0); - let err_result = retry( + retry( || async { *count.lock().await += 1; Result::<(), io::Error>::Err(io::Error::from(io::ErrorKind::Other)) @@ -175,11 +157,11 @@ mod tests { 1, 1, "work", - Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), + &CancellationToken::new(), ) - .await; - - assert!(err_result.is_err()); + .await + .expect("not cancelled") + .expect_err("it can only fail"); assert_eq!(*count.lock().await, 2); } @@ -201,10 +183,11 @@ mod tests { 2, 2, "work", - Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), + &CancellationToken::new(), ) .await - .unwrap(); + .expect("not cancelled") + .expect("success on second try"); } #[tokio::test(start_paused = true)] @@ -224,10 +207,11 @@ mod tests { 2, 2, "work", - Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), + &CancellationToken::new(), ) .await - .unwrap_err(); + .expect("was not cancellation") + .expect_err("it was permanent error"); assert_eq!(*count.lock().await, 1); } diff --git a/pageserver/src/consumption_metrics/upload.rs b/pageserver/src/consumption_metrics/upload.rs index 322ed95cc8..6b840a3136 100644 --- a/pageserver/src/consumption_metrics/upload.rs +++ b/pageserver/src/consumption_metrics/upload.rs @@ -262,35 +262,33 @@ async fn upload( ) -> Result<(), UploadError> { let warn_after = 3; let max_attempts = 10; + + // this is used only with tests so far + let last_value = if is_last { "true" } else { "false" }; + let res = utils::backoff::retry( - move || { - let body = body.clone(); - async move { - let res = client - .post(metric_collection_endpoint.clone()) - .header(reqwest::header::CONTENT_TYPE, "application/json") - .header( - LAST_IN_BATCH.clone(), - if is_last { "true" } else { "false" }, - ) - .body(body) - .send() - .await; + || async { + let res = client + .post(metric_collection_endpoint.clone()) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .header(LAST_IN_BATCH.clone(), last_value) + .body(body.clone()) + .send() + .await; - let res = res.and_then(|res| res.error_for_status()); + let res = res.and_then(|res| res.error_for_status()); - // 10 redirects are normally allowed, so we don't need worry about 3xx - match res { - Ok(_response) => Ok(()), - Err(e) => { - let status = e.status().filter(|s| s.is_client_error()); - if let Some(status) = status { - // rejection used to be a thing when the server could reject a - // whole batch of metrics if one metric was bad. - Err(UploadError::Rejected(status)) - } else { - Err(UploadError::Reqwest(e)) - } + // 10 redirects are normally allowed, so we don't need worry about 3xx + match res { + Ok(_response) => Ok(()), + Err(e) => { + let status = e.status().filter(|s| s.is_client_error()); + if let Some(status) = status { + // rejection used to be a thing when the server could reject a + // whole batch of metrics if one metric was bad. + Err(UploadError::Rejected(status)) + } else { + Err(UploadError::Reqwest(e)) } } } @@ -299,9 +297,11 @@ async fn upload( warn_after, max_attempts, "upload consumption_metrics", - utils::backoff::Cancel::new(cancel.clone(), || UploadError::Cancelled), + cancel, ) - .await; + .await + .ok_or_else(|| UploadError::Cancelled) + .and_then(|x| x); match &res { Ok(_) => {} diff --git a/pageserver/src/control_plane_client.rs b/pageserver/src/control_plane_client.rs index 950791ea48..61c7d03408 100644 --- a/pageserver/src/control_plane_client.rs +++ b/pageserver/src/control_plane_client.rs @@ -82,46 +82,29 @@ impl ControlPlaneClient { R: Serialize, T: DeserializeOwned, { - #[derive(thiserror::Error, Debug)] - enum RemoteAttemptError { - #[error("shutdown")] - Shutdown, - #[error("remote: {0}")] - Remote(reqwest::Error), - } - - match backoff::retry( + let res = backoff::retry( || async { let response = self .http_client .post(url.clone()) .json(&request) .send() - .await - .map_err(RemoteAttemptError::Remote)?; + .await?; - response - .error_for_status_ref() - .map_err(RemoteAttemptError::Remote)?; - response - .json::() - .await - .map_err(RemoteAttemptError::Remote) + response.error_for_status_ref()?; + response.json::().await }, |_| false, 3, u32::MAX, "calling control plane generation validation API", - backoff::Cancel::new(self.cancel.clone(), || RemoteAttemptError::Shutdown), + &self.cancel, ) .await - { - Err(RemoteAttemptError::Shutdown) => Err(RetryForeverError::ShuttingDown), - Err(RemoteAttemptError::Remote(_)) => { - panic!("We retry forever, this should never be reached"); - } - Ok(r) => Ok(r), - } + .ok_or(RetryForeverError::ShuttingDown)? + .expect("We retry forever, this should never be reached"); + + Ok(res) } } diff --git a/pageserver/src/deletion_queue/deleter.rs b/pageserver/src/deletion_queue/deleter.rs index 57421b1547..a75c73f2b1 100644 --- a/pageserver/src/deletion_queue/deleter.rs +++ b/pageserver/src/deletion_queue/deleter.rs @@ -77,9 +77,11 @@ impl Deleter { 3, 10, "executing deletion batch", - backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Shutting down")), + &self.cancel, ) .await + .ok_or_else(|| anyhow::anyhow!("Shutting down")) + .and_then(|x| x) } /// Block until everything in accumulator has been executed diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index b801347c06..624c3e365f 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3294,11 +3294,11 @@ impl Tenant { 3, u32::MAX, "persist_initdb_tar_zst", - backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), + &self.cancel, ) - .await?; - - Ok(()) + .await + .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .and_then(|x| x) } /// - run initdb to init temporary instance and get bootstrap data diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 0dbaa3ec93..7c35914b61 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -91,9 +91,11 @@ async fn create_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "mark_upload", - backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), + cancel, ) .await + .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .and_then(|x| x) .context("mark_upload")?; Ok(()) @@ -187,9 +189,11 @@ async fn remove_tenant_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "remove_tenant_remote_delete_mark", - backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), + cancel, ) .await + .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .and_then(|x| x) .context("remove_tenant_remote_delete_mark")?; } Ok(()) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 2e429ee9bc..831a073d17 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1046,9 +1046,11 @@ impl RemoteTimelineClient { // when executed as part of tenant deletion this happens in the background 2, "persist_index_part_with_deleted_flag", - backoff::Cancel::new(self.cancel.clone(), || anyhow::anyhow!("Cancelled")), + &self.cancel, ) - .await?; + .await + .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .and_then(|x| x)?; // all good, disarm the guard and mark as success ScopeGuard::into_inner(undo_deleted_at); @@ -1083,9 +1085,11 @@ impl RemoteTimelineClient { FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "preserve_initdb_tar_zst", - backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled!")), + &cancel.clone(), ) .await + .ok_or_else(|| anyhow::anyhow!("Cancellled")) + .and_then(|x| x) .context("backing up initdb archive")?; Ok(()) } @@ -1141,6 +1145,8 @@ impl RemoteTimelineClient { // taking the burden of listing all the layers that we already know we should delete. self.deletion_queue_client.flush_immediate().await?; + let cancel = shutdown_token(); + let remaining = backoff::retry( || async { self.storage_impl @@ -1151,9 +1157,11 @@ impl RemoteTimelineClient { FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "list_prefixes", - backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")), + &cancel, ) .await + .ok_or_else(|| anyhow::anyhow!("Cancelled!")) + .and_then(|x| x) .context("list prefixes")?; // We will delete the current index_part object last, since it acts as a deletion diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index b84b5ca33b..2c50726b43 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -76,7 +76,6 @@ pub async fn download_layer_file<'a>( // If pageserver crashes the temp file will be deleted on startup and re-downloaded. let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION); - let cancel_inner = cancel.clone(); let (mut destination_file, bytes_amount) = download_retry( || async { let destination_file = tokio::fs::File::create(&temp_file_path) @@ -87,7 +86,7 @@ pub async fn download_layer_file<'a>( // Cancellation safety: it is safe to cancel this future, because it isn't writing to a local // file: the write to local file doesn't start until after the request header is returned // and we start draining the body stream below - let download = download_cancellable(&cancel_inner, storage.download(&remote_path)) + let download = download_cancellable(cancel, storage.download(&remote_path)) .await .with_context(|| { format!( @@ -107,7 +106,7 @@ pub async fn download_layer_file<'a>( // we will imminiently try and write to again. let bytes_amount: u64 = match timeout_cancellable( DOWNLOAD_TIMEOUT, - &cancel_inner, + cancel, tokio::io::copy_buf(&mut reader, &mut destination_file), ) .await @@ -386,9 +385,11 @@ pub(super) async fn download_index_part( FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "listing index_part files", - backoff::Cancel::new(cancel.clone(), || anyhow::anyhow!("Cancelled")), + &cancel, ) .await + .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .and_then(|x| x) .map_err(DownloadError::Other)?; // General case logic for which index to use: the latest index whose generation @@ -510,7 +511,7 @@ pub(crate) async fn download_initdb_tar_zst( /// Helper function to handle retries for a download operation. /// -/// Remote operations can fail due to rate limits (IAM, S3), spurious network +/// Remote operations can fail due to rate limits (S3), spurious network /// problems, or other external reasons. Retry FAILED_DOWNLOAD_RETRIES times, /// with backoff. /// @@ -530,9 +531,11 @@ where FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, description, - backoff::Cancel::new(cancel.clone(), || DownloadError::Cancelled), + cancel, ) .await + .ok_or_else(|| DownloadError::Cancelled) + .and_then(|x| x) } async fn download_retry_forever( @@ -550,7 +553,9 @@ where FAILED_DOWNLOAD_WARN_THRESHOLD, u32::MAX, description, - backoff::Cancel::new(cancel, || DownloadError::Cancelled), + &cancel, ) .await + .ok_or_else(|| DownloadError::Cancelled) + .and_then(|x| x) } diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index 76df9ba5c4..e8ba1d3d6e 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -188,16 +188,18 @@ pub(crate) async fn time_travel_recover_tenant( backoff::retry( || async { storage - .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel.clone()) + .time_travel_recover(Some(prefix), timestamp, done_if_after, cancel) .await }, |e| !matches!(e, TimeTravelError::Other(_)), warn_after, max_attempts, "time travel recovery of tenant prefix", - backoff::Cancel::new(cancel.clone(), || TimeTravelError::Cancelled), + cancel, ) - .await?; + .await + .ok_or_else(|| TimeTravelError::Cancelled) + .and_then(|x| x)?; } Ok(()) } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 702c0b1ec1..55af4f9f2b 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -537,11 +537,11 @@ impl<'a> TenantDownloader<'a> { FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "download heatmap", - backoff::Cancel::new(self.secondary_state.cancel.clone(), || { - UpdateError::Cancelled - }), + &self.secondary_state.cancel, ) - .await?; + .await + .ok_or_else(|| UpdateError::Cancelled) + .and_then(|x| x)?; SECONDARY_MODE.download_heatmap.inc(); diff --git a/pageserver/src/tenant/secondary/heatmap_uploader.rs b/pageserver/src/tenant/secondary/heatmap_uploader.rs index df865658a4..fff29b2487 100644 --- a/pageserver/src/tenant/secondary/heatmap_uploader.rs +++ b/pageserver/src/tenant/secondary/heatmap_uploader.rs @@ -426,9 +426,11 @@ async fn upload_tenant_heatmap( 3, u32::MAX, "Uploading heatmap", - backoff::Cancel::new(tenant_cancel.clone(), || anyhow::anyhow!("Shutting down")), + &tenant_cancel, ) .await + .ok_or_else(|| anyhow::anyhow!("Shutting down")) + .and_then(|x| x) { if tenant_cancel.is_cancelled() { return Err(UploadHeatmapError::Cancelled); diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index e920d7be01..8510c5c586 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -315,9 +315,11 @@ async fn upload_parquet( FAILED_UPLOAD_MAX_RETRIES, "request_data_upload", // we don't want cancellation to interrupt here, so we make a dummy cancel token - backoff::Cancel::new(CancellationToken::new(), || anyhow::anyhow!("Cancelled")), + &CancellationToken::new(), ) .await + .ok_or_else(|| anyhow::anyhow!("Cancelled")) + .and_then(|x| x) .context("request_data_upload")?; Ok(buffer.writer()) diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index c47381351d..df99244770 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -558,16 +558,17 @@ pub async fn delete_timeline(ttid: &TenantTimelineId) -> Result<()> { backoff::retry( || async { let files = storage.list_files(Some(&remote_path)).await?; - storage.delete_objects(&files).await?; - Ok(()) + storage.delete_objects(&files).await }, |_| false, 3, 10, "executing WAL segments deletion batch", - backoff::Cancel::new(token, || anyhow::anyhow!("canceled")), + &token, ) - .await?; + .await + .ok_or_else(|| anyhow::anyhow!("canceled")) + .and_then(|x| x)?; Ok(()) }