diff --git a/Cargo.lock b/Cargo.lock index 4a9bbaec54..adf2594dbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4915,6 +4915,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tracing", "tracing-error", "tracing-subscriber", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index f01131540e..7315cda50e 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -26,6 +26,7 @@ serde_json.workspace = true signal-hook.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-util.workspace = true tracing.workspace = true tracing-error.workspace = true tracing-subscriber = { workspace = true, features = ["json", "registry"] } diff --git a/libs/utils/src/backoff.rs b/libs/utils/src/backoff.rs index 4507efe647..d50ad39585 100644 --- a/libs/utils/src/backoff.rs +++ b/libs/utils/src/backoff.rs @@ -1,18 +1,31 @@ use std::fmt::{Debug, Display}; use futures::Future; +use tokio_util::sync::CancellationToken; pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1; pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0; -pub async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) { +pub async fn exponential_backoff( + n: u32, + base_increment: f64, + max_seconds: f64, + cancel: &CancellationToken, +) { let backoff_duration_seconds = exponential_backoff_duration_seconds(n, base_increment, max_seconds); if backoff_duration_seconds > 0.0 { tracing::info!( "Backoff: waiting {backoff_duration_seconds} seconds before processing with the task", ); - tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await; + + drop( + tokio::time::timeout( + std::time::Duration::from_secs_f64(backoff_duration_seconds), + cancel.cancelled(), + ) + .await, + ) } } @@ -24,28 +37,57 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec } } +/// Configure cancellation for a retried operation: when to cancel (the token), and +/// what kind of error to return on cancellation +pub struct Cancel +where + E: Display + Debug + 'static, + CF: Fn() -> E, +{ + token: CancellationToken, + on_cancel: CF, +} + +impl Cancel +where + E: Display + Debug + 'static, + CF: Fn() -> E, +{ + pub fn new(token: CancellationToken, on_cancel: CF) -> Self { + Self { token, on_cancel } + } +} + /// retries passed operation until one of the following conditions are met: /// Encountered error is considered as permanent (non-retryable) /// Retries have been exhausted. /// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors /// When attempts cross `warn_threshold` function starts to emit log warnings. /// `description` argument is added to log messages. Its value should identify the `op` is doing -pub async fn retry( +/// `cancel` argument is required: any time we are looping on retry, we should be using a CancellationToken +/// to drop out promptly on shutdown. +pub async fn retry( mut op: O, is_permanent: impl Fn(&E) -> bool, warn_threshold: u32, max_retries: u32, description: &str, + cancel: Cancel, ) -> Result where // Not std::error::Error because anyhow::Error doesnt implement it. // For context see https://github.com/dtolnay/anyhow/issues/63 - E: Display + Debug, + E: Display + Debug + 'static, O: FnMut() -> F, F: Future>, + CF: Fn() -> E, { let mut attempts = 0; loop { + if cancel.token.is_cancelled() { + return Err((cancel.on_cancel)()); + } + let result = op().await; match result { Ok(_) => { @@ -80,6 +122,7 @@ where attempts, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, + &cancel.token, ) .await; attempts += 1; @@ -132,6 +175,7 @@ mod tests { 1, 1, "work", + Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await; @@ -157,6 +201,7 @@ mod tests { 2, 2, "work", + Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await .unwrap(); @@ -179,6 +224,7 @@ mod tests { 2, 2, "work", + Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await .unwrap_err(); diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index de509cd3de..5c23af8036 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -7,6 +7,7 @@ use anyhow::Context; use pageserver_api::models::TenantState; use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; use tokio::sync::OwnedMutexGuard; +use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, warn, Instrument, Span}; use utils::{ @@ -82,6 +83,8 @@ async fn create_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "mark_upload", + // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) + backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await .context("mark_upload")?; @@ -171,6 +174,8 @@ async fn remove_tenant_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "remove_tenant_remote_delete_mark", + // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) + backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await .context("remove_tenant_remote_delete_mark")?; @@ -252,6 +257,8 @@ pub(crate) async fn remote_delete_mark_exists( SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS, SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS, "fetch_tenant_deletion_mark", + // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) + backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 3193a7eb57..0324197da6 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -211,6 +211,7 @@ use chrono::{NaiveDateTime, Utc}; // re-export these pub use download::{is_temp_download_file, list_remote_timelines}; use scopeguard::ScopeGuard; +use tokio_util::sync::CancellationToken; use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; @@ -231,6 +232,7 @@ use crate::metrics::{ RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS, }; +use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::tenant::upload_queue::Delete; @@ -754,7 +756,7 @@ impl RemoteTimelineClient { pausable_failpoint!("persist_deleted_index_part"); backoff::retry( - || async { + || { upload::upload_index_part( self.conf, &self.storage_impl, @@ -762,7 +764,6 @@ impl RemoteTimelineClient { &self.timeline_id, &index_part_with_deleted_at, ) - .await }, |_e| false, 1, @@ -771,6 +772,8 @@ impl RemoteTimelineClient { // when executed as part of tenant deletion this happens in the background 2, "persist_index_part_with_deleted_flag", + // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) + backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await?; @@ -857,6 +860,7 @@ impl RemoteTimelineClient { FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "list_prefixes", + backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")), ) .await .context("list prefixes")?; @@ -880,6 +884,7 @@ impl RemoteTimelineClient { FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "delete_objects", + backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")), ) .await .context("delete_objects")?; @@ -901,6 +906,7 @@ impl RemoteTimelineClient { FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "delete_index", + backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled")), ) .await .context("delete_index")?; @@ -1134,14 +1140,13 @@ impl RemoteTimelineClient { } // sleep until it's time to retry, or we're cancelled - tokio::select! { - _ = task_mgr::shutdown_watcher() => { }, - _ = exponential_backoff( - retries, - DEFAULT_BASE_BACKOFF_SECONDS, - DEFAULT_MAX_BACKOFF_SECONDS, - ) => { }, - }; + exponential_backoff( + retries, + DEFAULT_BASE_BACKOFF_SECONDS, + DEFAULT_MAX_BACKOFF_SECONDS, + &shutdown_token(), + ) + .await; } } } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 7426ae10e9..2cb33f07c9 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -11,6 +11,7 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use tokio::fs; use tokio::io::AsyncWriteExt; +use tokio_util::sync::CancellationToken; use utils::{backoff, crashsafe}; use crate::config::PageServerConf; @@ -280,6 +281,10 @@ where FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, description, + // TODO: use a cancellation token (https://github.com/neondatabase/neon/issues/5066) + backoff::Cancel::new(CancellationToken::new(), || -> DownloadError { + unreachable!() + }), ) .await } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index d52b29f175..4eef0f9eff 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -17,7 +17,7 @@ use crate::metrics::{ WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED, WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES, }; -use crate::task_mgr::TaskKind; +use crate::task_mgr::{shutdown_token, TaskKind}; use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline}; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; @@ -211,11 +211,14 @@ async fn subscribe_for_timeline_updates( id: TenantTimelineId, ) -> Streaming { let mut attempt = 0; + let cancel = shutdown_token(); + loop { exponential_backoff( attempt, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, + &cancel, ) .await; attempt += 1;