From 8c2ff87f1a23688e1d4798f365d28e03b79c382b Mon Sep 17 00:00:00 2001 From: John Spray Date: Tue, 22 Aug 2023 12:22:24 +0100 Subject: [PATCH] libs: make backoff::retry() take a cancellation token --- Cargo.lock | 1 + libs/utils/Cargo.toml | 1 + libs/utils/src/backoff.rs | 50 +++++++++++++++++-- pageserver/src/tenant/delete.rs | 7 +++ .../src/tenant/remote_timeline_client.rs | 9 +++- .../tenant/remote_timeline_client/download.rs | 5 ++ .../walreceiver/connection_manager.rs | 5 +- 7 files changed, 71 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 23741e2a3d..01396c4230 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4915,6 +4915,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tracing", "tracing-error", "tracing-subscriber", diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index f01131540e..7315cda50e 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -26,6 +26,7 @@ serde_json.workspace = true signal-hook.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-util.workspace = true tracing.workspace = true tracing-error.workspace = true tracing-subscriber = { workspace = true, features = ["json", "registry"] } diff --git a/libs/utils/src/backoff.rs b/libs/utils/src/backoff.rs index 4507efe647..859bec940f 100644 --- a/libs/utils/src/backoff.rs +++ b/libs/utils/src/backoff.rs @@ -1,18 +1,27 @@ use std::fmt::{Debug, Display}; use futures::Future; +use tokio_util::sync::CancellationToken; pub const DEFAULT_BASE_BACKOFF_SECONDS: f64 = 0.1; pub const DEFAULT_MAX_BACKOFF_SECONDS: f64 = 3.0; -pub async fn exponential_backoff(n: u32, base_increment: f64, max_seconds: f64) { +pub async fn exponential_backoff( + n: u32, + base_increment: f64, + max_seconds: f64, + cancel: &CancellationToken, +) { let backoff_duration_seconds = exponential_backoff_duration_seconds(n, base_increment, max_seconds); if backoff_duration_seconds > 0.0 { tracing::info!( "Backoff: waiting {backoff_duration_seconds} seconds before processing with the task", ); - tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)).await; + tokio::select! { + _ = tokio::time::sleep(std::time::Duration::from_secs_f64(backoff_duration_seconds)) => {}, + _ = cancel.cancelled() => {} + } } } @@ -24,28 +33,57 @@ pub fn exponential_backoff_duration_seconds(n: u32, base_increment: f64, max_sec } } +/// Configure cancellation for a retried operation: when to cancel (the token), and +/// what kind of error to return on cancellation +pub struct Cancel +where + E: Display + Debug + 'static, + CF: Fn() -> E, +{ + token: CancellationToken, + on_cancel: CF, +} + +impl Cancel +where + E: Display + Debug + 'static, + CF: Fn() -> E, +{ + pub fn new(token: CancellationToken, on_cancel: CF) -> Self { + Self { token, on_cancel } + } +} + /// retries passed operation until one of the following conditions are met: /// Encountered error is considered as permanent (non-retryable) /// Retries have been exhausted. /// `is_permanent` closure should be used to provide distinction between permanent/non-permanent errors /// When attempts cross `warn_threshold` function starts to emit log warnings. /// `description` argument is added to log messages. Its value should identify the `op` is doing -pub async fn retry( +/// `cancel` argument is required: any time we are looping on retry, we should be using a CancellationToken +/// to drop out promptly on shutdown. +pub async fn retry( mut op: O, is_permanent: impl Fn(&E) -> bool, warn_threshold: u32, max_retries: u32, description: &str, + cancel: Cancel, ) -> Result where // Not std::error::Error because anyhow::Error doesnt implement it. // For context see https://github.com/dtolnay/anyhow/issues/63 - E: Display + Debug, + E: Display + Debug + 'static, O: FnMut() -> F, F: Future>, + CF: Fn() -> E, { let mut attempts = 0; loop { + if cancel.token.is_cancelled() { + return Err((cancel.on_cancel)()); + } + let result = op().await; match result { Ok(_) => { @@ -80,6 +118,7 @@ where attempts, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, + &cancel.token, ) .await; attempts += 1; @@ -132,6 +171,7 @@ mod tests { 1, 1, "work", + Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await; @@ -157,6 +197,7 @@ mod tests { 2, 2, "work", + Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await .unwrap(); @@ -179,6 +220,7 @@ mod tests { 2, 2, "work", + Cancel::new(CancellationToken::new(), || -> io::Error { unreachable!() }), ) .await .unwrap_err(); diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index de509cd3de..f5d4142f33 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -7,6 +7,7 @@ use anyhow::Context; use pageserver_api::models::TenantState; use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath}; use tokio::sync::OwnedMutexGuard; +use tokio_util::sync::CancellationToken; use tracing::{error, info, instrument, warn, Instrument, Span}; use utils::{ @@ -82,6 +83,8 @@ async fn create_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "mark_upload", + // TODO: thread a cancellation token into this code path + backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await .context("mark_upload")?; @@ -171,6 +174,8 @@ async fn remove_tenant_remote_delete_mark( FAILED_UPLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "remove_tenant_remote_delete_mark", + // TODO: thread a cancellation token into this code path + backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await .context("remove_tenant_remote_delete_mark")?; @@ -252,6 +257,8 @@ pub(crate) async fn remote_delete_mark_exists( SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS, SHOULD_RESUME_DELETION_FETCH_MARK_ATTEMPTS, "fetch_tenant_deletion_mark", + // TODO: thread a cancellation token into this code path + backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 5c602cc75f..909a90178a 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -210,6 +210,7 @@ use chrono::{NaiveDateTime, Utc}; // re-export these pub use download::{is_temp_download_file, list_remote_timelines}; use scopeguard::ScopeGuard; +use tokio_util::sync::CancellationToken; use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; @@ -231,6 +232,7 @@ use crate::metrics::{ RemoteTimelineClientMetricsCallTrackSize, REMOTE_ONDEMAND_DOWNLOADED_BYTES, REMOTE_ONDEMAND_DOWNLOADED_LAYERS, }; +use crate::task_mgr::shutdown_token; use crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::index::LayerFileMetadata; use crate::{ @@ -748,7 +750,7 @@ impl RemoteTimelineClient { pausable_failpoint!("persist_deleted_index_part"); backoff::retry( - || async { + || { upload::upload_index_part( self.conf, &self.storage_impl, @@ -756,7 +758,6 @@ impl RemoteTimelineClient { &self.timeline_id, &index_part_with_deleted_at, ) - .await }, |_e| false, 1, @@ -765,6 +766,7 @@ impl RemoteTimelineClient { // when executed as part of tenant deletion this happens in the background 2, "persist_index_part_with_deleted_flag", + backoff::Cancel::new(CancellationToken::new(), || unreachable!()), ) .await?; @@ -838,6 +840,7 @@ impl RemoteTimelineClient { FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, "list_prefixes", + backoff::Cancel::new(shutdown_token(), || anyhow::anyhow!("Cancelled!")), ) .await .context("list prefixes")?; @@ -1085,12 +1088,14 @@ impl RemoteTimelineClient { } // sleep until it's time to retry, or we're cancelled + let cancel = shutdown_token(); tokio::select! { _ = task_mgr::shutdown_watcher() => { }, _ = exponential_backoff( retries, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, + &cancel ) => { }, }; } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 831dc62f33..3f5074e7d1 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -11,6 +11,7 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use tokio::fs; use tokio::io::AsyncWriteExt; +use tokio_util::sync::CancellationToken; use utils::{backoff, crashsafe}; use crate::config::PageServerConf; @@ -266,6 +267,10 @@ where FAILED_DOWNLOAD_WARN_THRESHOLD, FAILED_REMOTE_OP_RETRIES, description, + // TODO: pass a cancellation token into this function + backoff::Cancel::new(CancellationToken::new(), || -> DownloadError { + unreachable!() + }), ) .await } diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index d52b29f175..4eef0f9eff 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -17,7 +17,7 @@ use crate::metrics::{ WALRECEIVER_ACTIVE_MANAGERS, WALRECEIVER_BROKER_UPDATES, WALRECEIVER_CANDIDATES_ADDED, WALRECEIVER_CANDIDATES_REMOVED, WALRECEIVER_SWITCHES, }; -use crate::task_mgr::TaskKind; +use crate::task_mgr::{shutdown_token, TaskKind}; use crate::tenant::{debug_assert_current_span_has_tenant_and_timeline_id, Timeline}; use anyhow::Context; use chrono::{NaiveDateTime, Utc}; @@ -211,11 +211,14 @@ async fn subscribe_for_timeline_updates( id: TenantTimelineId, ) -> Streaming { let mut attempt = 0; + let cancel = shutdown_token(); + loop { exponential_backoff( attempt, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, + &cancel, ) .await; attempt += 1;