From 728052bd2eac182107942f060dbeef3fe1d7b0a3 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Tue, 21 Jan 2025 23:16:53 +0100 Subject: [PATCH] pausable_failpoint: add ability to provide a cancel flag, similar to what we have for sleep --- libs/utils/src/failpoint_support.rs | 54 +++++++++++++++++++++-------- pageserver/src/tenant/tasks.rs | 7 ++-- 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index 701ba2d42c..272c6ebb26 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -11,31 +11,55 @@ use tracing::*; /// Declare a failpoint that can use to `pause` failpoint action. /// We don't want to block the executor thread, hence, spawn_blocking + await. +/// +/// Optionally pass a cancellation token, and this failpoint will drop out of +/// its pause when the cancellation token fires. This is useful for testing +/// cases where we would like to block something, but test its clean shutdown behavior. +/// The macro evaluates to a Result in that case, where Ok(()) is the case +/// where the failpoint was not paused, and Err() is the case where cancellation +/// token fired while evaluating the failpoint. +/// +/// Remember to unpause the failpoint in the test; until that happens, one of the +/// limited number of spawn_blocking thread pool threads is leaked. #[macro_export] macro_rules! pausable_failpoint { - ($name:literal) => { + ($name:literal) => {{ if cfg!(feature = "testing") { - tokio::task::spawn_blocking({ - let current = tracing::Span::current(); + let cancel = ::tokio_util::sync::CancellationToken::new(); + let _ = $crate::pausable_failpoint!($name, &cancel); + } + }}; + ($name:literal, $cancel:expr) => {{ + if cfg!(feature = "testing") { + let failpoint_fut = ::tokio::task::spawn_blocking({ + let current = ::tracing::Span::current(); move || { let _entered = current.entered(); - tracing::info!("at failpoint {}", $name); - fail::fail_point!($name); + ::tracing::info!("at failpoint {}", $name); + ::fail::fail_point!($name); + } + }); + let cancel_fut = async move { + $cancel.cancelled().await; + }; + ::tokio::select! { + res = failpoint_fut => { + res.expect("spawn_blocking"); + // continue with execution + Ok(()) + }, + _ = cancel_fut => { + Err(()) } - }) - .await - .expect("spawn_blocking"); - } - }; - ($name:literal, $cond:expr) => { - if cfg!(feature = "testing") { - if $cond { - pausable_failpoint!($name) } + } else { + Ok(()) } - }; + }}; } +pub use pausable_failpoint; + /// use with fail::cfg("$name", "return(2000)") /// /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the diff --git a/pageserver/src/tenant/tasks.rs b/pageserver/src/tenant/tasks.rs index 0118a5ce5f..3725e2f7fc 100644 --- a/pageserver/src/tenant/tasks.rs +++ b/pageserver/src/tenant/tasks.rs @@ -67,10 +67,9 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit( ) -> tokio::sync::SemaphorePermit<'static> { let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE.measure_acquisition(loop_kind); - pausable_failpoint!( - "initial-size-calculation-permit-pause", - loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation - ); + if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation { + pausable_failpoint!("initial-size-calculation-permit-pause"); + } // TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id(); match CONCURRENT_BACKGROUND_TASKS.acquire().await {