pausable_failpoint: add ability to provide a cancel flag, similar to what we have for sleep

This commit is contained in:
Christian Schwarz
2025-01-21 23:16:53 +01:00
parent 361210f8dc
commit 728052bd2e
2 changed files with 42 additions and 19 deletions

View File

@@ -11,31 +11,55 @@ use tracing::*;
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
///
/// Optionally pass a cancellation token, and this failpoint will drop out of
/// its pause when the cancellation token fires. This is useful for testing
/// cases where we would like to block something, but test its clean shutdown behavior.
/// The macro evaluates to a Result in that case, where Ok(()) is the case
/// where the failpoint was not paused, and Err() is the case where cancellation
/// token fired while evaluating the failpoint.
///
/// Remember to unpause the failpoint in the test; until that happens, one of the
/// limited number of spawn_blocking thread pool threads is leaked.
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {
($name:literal) => {{
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
let cancel = ::tokio_util::sync::CancellationToken::new();
let _ = $crate::pausable_failpoint!($name, &cancel);
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
let failpoint_fut = ::tokio::task::spawn_blocking({
let current = ::tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint {}", $name);
fail::fail_point!($name);
::tracing::info!("at failpoint {}", $name);
::fail::fail_point!($name);
}
});
let cancel_fut = async move {
$cancel.cancelled().await;
};
::tokio::select! {
res = failpoint_fut => {
res.expect("spawn_blocking");
// continue with execution
Ok(())
},
_ = cancel_fut => {
Err(())
}
})
.await
.expect("spawn_blocking");
}
};
($name:literal, $cond:expr) => {
if cfg!(feature = "testing") {
if $cond {
pausable_failpoint!($name)
}
} else {
Ok(())
}
};
}};
}
pub use pausable_failpoint;
/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the

View File

@@ -67,10 +67,9 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
) -> tokio::sync::SemaphorePermit<'static> {
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE.measure_acquisition(loop_kind);
pausable_failpoint!(
"initial-size-calculation-permit-pause",
loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
);
if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
pausable_failpoint!("initial-size-calculation-permit-pause");
}
// TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
match CONCURRENT_BACKGROUND_TASKS.acquire().await {