diff --git a/Cargo.lock b/Cargo.lock index 01396c4230..23741e2a3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4915,7 +4915,6 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", - "tokio-util", "tracing", "tracing-error", "tracing-subscriber", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 7315cda50e..f01131540e 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -26,7 +26,6 @@ serde_json.workspace = true signal-hook.workspace = true thiserror.workspace = true tokio.workspace = true -tokio-util.workspace = true tracing.workspace = true tracing-error.workspace = true tracing-subscriber = { workspace = true, features = ["json", "registry"] } diff --git a/libs/utils/src/backoff.rs b/libs/utils/src/backoff.rs index 859bec940f..4507efe647 100644 --- a/libs/utils/src/backoff.rs +++ b/libs/utils/src/backoff.rs @@ -1,27 +1,18 @@ use std::fmt::{Debug, Display}; use futures::Future; -use tokio_util::sync::CancellationToken; pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1; pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0; -pub async fn exponential_backoff( - n: u32, - base_increment: f64, - max_seconds: f64, - cancel: &CancellationToken, -) { +pub async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) { let backoff_duration_seconds = exponential_backoff_duration_seconds(n, base_increment, max_seconds); if backoff_duration_seconds > 0.0 { tracing::info!( "Backoff: waiting {backoff_duration_seconds} seconds before processing with the task", ); - tokio::select! { - _ = tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)) => {}, - _ = cancel.cancelled() => {} - } + tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await; } } @@ -33,57 +24,28 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec } } -/// Configure cancellation for a retried operation: when to cancel (the token), and -/// what kind of error to return on cancellation -pub struct Cancel -where - E: Display + Debug + 'static, - CF: Fn() -> E, -{ - token: CancellationToken, - on_cancel: CF, -} - -impl Cancel -where - E: Display + Debug + 'static, - CF: Fn() -> E, -{ - pub fn new(token: CancellationToken, on_cancel: CF) -> Self { - Self { token, on_cancel } - } -} - /// retries passed operation until one of the following conditions are met: /// Encountered error is considered as permanent (non-retryable) /// Retries have been exhausted. /// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors /// When attempts cross `warn_threshold` function starts to emit log warnings. /// `description` argument is added to log messages. Its value should identify the `op` is doing -/// `cancel` argument is required: any time we are looping on retry, we should be using a CancellationToken -/// to drop out promptly on shutdown. -pub async fn retry( +pub async fn retry( mut op: O, is_permanent: impl Fn(&E) -> bool, warn_threshold: u32, max_retries: u32, description: &str, - cancel: Cancel, ) -> Result where // Not std::error::Error because anyhow::Error doesnt implement it. // For context see https://github.com/dtolnay/anyhow/issues/63 - E: Display + Debug + 'static, + E: Display + Debug, O: FnMut() -> F, F: Future>, - CF: Fn() -> E, { let mut attempts = 0; loop { - if cancel.token.is_cancelled() { - return Err((cancel.on_cancel)()); - } - let result = op().await; match result { Ok(_) => { @@ -118,7 +80,6 @@ where attempts, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, - &cancel.token, ) .await; attempts += 1; @@ -171,7 +132,6 @@ mod tests { 1, 1, "work", - Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await; @@ -197,7 +157,6 @@ mod tests { 2, 2, "work", - Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await .unwrap(); @@ -220,7 +179,6 @@ mod tests { 2, 2, "work", - Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await .unwrap_err(); diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index f5d4142f33..de509cd3de 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -7,7 +7,6 @@ use anyhow::Context; use pageserver_api::models::TenantState; use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; use tokio::sync::OwnedMutexGuard; -use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, warn, Instrument, Span}; use utils::{ @@ -83,8 +82,6 @@ async fn create_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "mark_upload", - // TODO: thread a cancellation token into this code path - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await .context("mark_upload")?; @@ -174,8 +171,6 @@ async fn remove_tenant_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "remove_tenant_remote_delete_mark", - // TODO: thread a cancellation token into this code path - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await .context("remove_tenant_remote_delete_mark")?; @@ -257,8 +252,6 @@ pub(crate) async fn remote_delete_mark_exists( SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS, SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS, "fetch_tenant_deletion_mark", - // TODO: thread a cancellation token into this code path - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 1960183285..ab116de090 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -214,7 +214,6 @@ use chrono::{NaiveDateTime, Utc}; // re-export these pub use download::{is_temp_download_file, list_remote_timelines}; use scopeguard::ScopeGuard; -use tokio_util::sync::CancellationToken; use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; @@ -236,7 +235,6 @@ use crate::metrics::{ RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS, }; -use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::{ @@ -754,7 +752,7 @@ impl RemoteTimelineClient { pausable_failpoint!("persist_deleted_index_part"); backoff::retry( - || { + || async { upload::upload_index_part( self.conf, &self.storage_impl, @@ -762,6 +760,7 @@ impl RemoteTimelineClient { &self.timeline_id, &index_part_with_deleted_at, ) + .await }, |_e| false, 1, @@ -770,7 +769,6 @@ impl RemoteTimelineClient { // when executed as part of tenant deletion this happens in the background 2, "persist_index_part_with_deleted_flag", - backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await?; @@ -854,7 +852,6 @@ impl RemoteTimelineClient { FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "list_prefixes", - backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")), ) .await .context("list prefixes")?; @@ -1100,14 +1097,12 @@ impl RemoteTimelineClient { } // sleep until it's time to retry, or we're cancelled - let cancel = shutdown_token(); tokio::select! { _ = task_mgr::shutdown_watcher() => { }, _ = exponential_backoff( retries, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, - &cancel ) => { }, }; } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 3f5074e7d1..831dc62f33 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -11,7 +11,6 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use tokio::fs; use tokio::io::AsyncWriteExt; -use tokio_util::sync::CancellationToken; use utils::{backoff, crashsafe}; use crate::config::PageServerConf; @@ -267,10 +266,6 @@ where FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, description, - // TODO: pass a cancellation token into this function - backoff::Cancel::new(CancellationToken::new(), || -> DownloadError { - unreachable!() - }), ) .await } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 4eef0f9eff..d52b29f175 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -17,7 +17,7 @@ use crate::metrics::{ WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED, WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES, }; -use crate::task_mgr::{shutdown_token, TaskKind}; +use crate::task_mgr::TaskKind; use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline}; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; @@ -211,14 +211,11 @@ async fn subscribe_for_timeline_updates( id: TenantTimelineId, ) -> Streaming { let mut attempt = 0; - let cancel = shutdown_token(); - loop { exponential_backoff( attempt, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, - &cancel, ) .await; attempt += 1;