great progress although that Pin<Box<dyn Future>> hurts perf, let's revisit it

This commit is contained in:
Christian Schwarz
2025-07-11 15:11:00 +02:00
parent 01cd326153
commit 3a3062d236
10 changed files with 506 additions and 156 deletions

2
Cargo.lock generated
View File

@@ -3857,6 +3857,7 @@ name = "neon_failpoint"
version = "0.1.0"
dependencies = [
"anyhow",
"either",
"once_cell",
"parking_lot 0.12.1",
"rand 0.8.5",
@@ -8180,6 +8181,7 @@ dependencies = [
"const_format",
"criterion",
"diatomic-waker",
"either",
"futures",
"git-version",
"hex",

View File

@@ -13,6 +13,7 @@ regex = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
either = { workspace = true }
[dev-dependencies]
tracing-subscriber = { workspace = true, features = ["fmt"] }
@@ -23,4 +24,4 @@ testing = []
[[example]]
name = "context_demo"
required-features = ["testing"]
required-features = ["testing"]

View File

@@ -9,8 +9,11 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use std::pin::Pin;
use std::future::Future;
use anyhow::Result;
use either::Either;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use rand::Rng;
@@ -170,19 +173,19 @@ pub fn has_failpoints() -> bool {
}
/// Execute a failpoint with optional context
pub async fn failpoint(name: &str, context: Option<&FailpointContext>) -> FailpointResult {
failpoint_with_cancellation(name, context, &CancellationToken::new()).await
pub fn failpoint(name: &str, context: Option<&FailpointContext>) -> Either<FailpointResult, Pin<Box<dyn Future<Output = FailpointResult> + Send>>> {
failpoint_with_cancellation(name, context, &CancellationToken::new())
}
/// Execute a failpoint with cancellation support
pub async fn failpoint_with_cancellation(
pub fn failpoint_with_cancellation(
name: &str,
context: Option<&FailpointContext>,
cancel_token: &CancellationToken,
) -> FailpointResult {
) -> Either<FailpointResult, Pin<Box<dyn Future<Output = FailpointResult> + Send>>> {
// Only check failpoints if testing feature is enabled
if !cfg!(feature = "testing") {
return FailpointResult::Continue;
return Either::Left(FailpointResult::Continue);
}
let config = {
@@ -191,13 +194,13 @@ pub async fn failpoint_with_cancellation(
};
let Some(config) = config else {
return FailpointResult::Continue;
return Either::Left(FailpointResult::Continue);
};
// Check context matchers if provided
if let (Some(matchers), Some(ctx)) = (&config.context_matchers, context) {
if !matches_context(matchers, ctx) {
return FailpointResult::Continue;
return Either::Left(FailpointResult::Continue);
}
}
@@ -206,7 +209,7 @@ pub async fn failpoint_with_cancellation(
// Check if we've hit the max count
if let Some(max_count) = config.action_spec.max_count {
if config.trigger_count >= max_count {
return FailpointResult::Continue;
return Either::Left(FailpointResult::Continue);
}
}
@@ -214,7 +217,7 @@ pub async fn failpoint_with_cancellation(
let mut rng = rand::thread_rng();
let roll: u8 = rng.gen_range(1..=100);
if roll > probability {
return FailpointResult::Continue;
return Either::Left(FailpointResult::Continue);
}
// Increment trigger count
@@ -228,24 +231,36 @@ pub async fn failpoint_with_cancellation(
tracing::info!("Hit failpoint: {}", name);
execute_action(name, &config.action_spec, context, cancel_token).await
execute_action(name, &config.action_spec, context, cancel_token)
}
/// Execute a specific action (used for recursive execution in probability-based actions)
fn execute_action<'a>(
name: &'a str,
action_spec: &'a FailpointActionSpec,
_context: Option<&'a FailpointContext>,
cancel_token: &'a CancellationToken,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = FailpointResult> + Send + 'a>> {
Box::pin(async move {
match &action_spec.action {
FailpointAction::Off => FailpointResult::Continue,
FailpointAction::Return(value) => {
tracing::info!("Failpoint {} returning: {:?}", name, value);
FailpointResult::Return(value.clone())
}
FailpointAction::Sleep(millis) => {
fn execute_action(
name: &str,
action_spec: &FailpointActionSpec,
_context: Option<&FailpointContext>,
cancel_token: &CancellationToken,
) -> Either<FailpointResult, Pin<Box<dyn Future<Output = FailpointResult> + Send>>> {
match &action_spec.action {
FailpointAction::Off => Either::Left(FailpointResult::Continue),
FailpointAction::Return(value) => {
tracing::info!("Failpoint {} returning: {:?}", name, value);
Either::Left(FailpointResult::Return(value.clone()))
}
FailpointAction::Exit => {
tracing::info!("Failpoint {} exiting process", name);
std::process::exit(1);
}
FailpointAction::Panic(message) => {
tracing::error!("Failpoint {} panicking with message: {}", name, message);
panic!("Failpoint panicked: {}", message);
}
FailpointAction::Sleep(millis) => {
let millis = *millis;
let name = name.to_string();
let cancel_token = cancel_token.clone();
Either::Right(Box::pin(async move {
tracing::info!("Failpoint {} sleeping for {}ms", name, millis);
// Create a notifier for this task
@@ -254,20 +269,20 @@ fn execute_action<'a>(
// Add the notifier to the failpoint configuration
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
if let Some(fp_config) = failpoints.get_mut(&name) {
fp_config.notifiers.push(notifier.clone());
}
}
// Create cleanup guard to remove notifier when done
let _guard = NotifierCleanupGuard {
failpoint_name: name.to_string(),
failpoint_name: name.clone(),
notifier: notifier.clone(),
};
// Sleep with cancellation support
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(*millis)) => {
_ = tokio::time::sleep(Duration::from_millis(millis)) => {
tracing::info!("Failpoint {} finished sleeping", name);
FailpointResult::Continue
}
@@ -280,8 +295,13 @@ fn execute_action<'a>(
FailpointResult::Continue
}
}
}
FailpointAction::Pause => {
}))
}
FailpointAction::Pause => {
let name = name.to_string();
let cancel_token = cancel_token.clone();
Either::Right(Box::pin(async move {
tracing::info!("Failpoint {} pausing", name);
// Create a notifier for this task
@@ -290,14 +310,14 @@ fn execute_action<'a>(
// Add the notifier to the failpoint configuration
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
if let Some(fp_config) = failpoints.get_mut(&name) {
fp_config.notifiers.push(notifier.clone());
}
}
// Create cleanup guard to remove notifier when done
let _guard = NotifierCleanupGuard {
failpoint_name: name.to_string(),
failpoint_name: name.clone(),
notifier: notifier.clone(),
};
@@ -312,17 +332,9 @@ fn execute_action<'a>(
FailpointResult::Continue
}
}
}
FailpointAction::Exit => {
tracing::info!("Failpoint {} exiting process", name);
std::process::exit(1);
}
FailpointAction::Panic(message) => {
tracing::error!("Failpoint {} panicking with message: {}", name, message);
panic!("Failpoint panicked: {}", message);
}
}))
}
})
}
}
/// Parse an action string into a FailpointActionSpec
@@ -439,17 +451,27 @@ mod tests {
use super::*;
use tokio::time::timeout;
// Helper function to await either sync or async failpoint results
async fn await_failpoint_result(
either: Either<FailpointResult, Pin<Box<dyn Future<Output = FailpointResult> + Send>>>,
) -> FailpointResult {
match either {
Either::Left(result) => result,
Either::Right(future) => future.await,
}
}
#[tokio::test]
async fn test_failpoint_off() {
configure_failpoint("test_off", "off").unwrap();
let result = failpoint("test_off", None).await;
let result = await_failpoint_result(failpoint("test_off", None)).await;
matches!(result, FailpointResult::Continue);
}
#[tokio::test]
async fn test_failpoint_return() {
configure_failpoint("test_return", "return(42)").unwrap();
let result = failpoint("test_return", None).await;
let result = await_failpoint_result(failpoint("test_return", None)).await;
if let FailpointResult::Return(Some(value)) = result {
assert_eq!(value, "42");
} else {
@@ -461,7 +483,7 @@ mod tests {
async fn test_failpoint_sleep() {
configure_failpoint("test_sleep", "sleep(10)").unwrap();
let start = std::time::Instant::now();
let result = failpoint("test_sleep", None).await;
let result = await_failpoint_result(failpoint("test_sleep", None)).await;
let duration = start.elapsed();
matches!(result, FailpointResult::Continue);
@@ -480,7 +502,7 @@ mod tests {
cancel_token_clone.cancel();
});
let result = failpoint_with_cancellation("test_pause", None, &cancel_token).await;
let result = await_failpoint_result(failpoint_with_cancellation("test_pause", None, &cancel_token)).await;
matches!(result, FailpointResult::Cancelled);
}
@@ -492,7 +514,9 @@ mod tests {
let start = std::time::Instant::now();
// Start a task that hits the failpoint
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
// Give it time to hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -522,7 +546,9 @@ mod tests {
configure_failpoint(failpoint_name, "pause").unwrap();
// Start a task that hits the failpoint
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
// Give it time to hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -548,7 +574,9 @@ mod tests {
let start = std::time::Instant::now();
// Start a task that hits the failpoint
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
// Give it time to hit the failpoint and start sleeping
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -579,7 +607,9 @@ mod tests {
let start = std::time::Instant::now();
// Start a task that hits the failpoint
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
// Give it time to hit the failpoint and start sleeping
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -608,7 +638,9 @@ mod tests {
configure_failpoint(failpoint_name, "pause").unwrap();
// Start a task that hits the failpoint
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
// Give it time to hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -632,11 +664,17 @@ mod tests {
configure_failpoint(failpoint_name, "pause").unwrap();
// Start multiple tasks that hit the same failpoint
let task1 = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task1 = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
let task2 = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task2 = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
let task3 = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task3 = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
// Give them time to hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
@@ -672,7 +710,9 @@ mod tests {
configure_failpoint(failpoint_name, "pause").unwrap();
// Start a task that will hit the failpoint
let task = tokio::spawn(async move { failpoint(failpoint_name, None).await });
let task = tokio::spawn(async move {
await_failpoint_result(failpoint(failpoint_name, None)).await
});
// Rapidly change the configuration multiple times
for i in 0..10 {
@@ -709,7 +749,7 @@ mod tests {
configure_failpoint_with_context("test_context", "return(matched)", matchers).unwrap();
let result = failpoint("test_context", Some(&context)).await;
let result = await_failpoint_result(failpoint("test_context", Some(&context))).await;
if let FailpointResult::Return(Some(value)) = result {
assert_eq!(value, "matched");
} else {
@@ -721,7 +761,7 @@ mod tests {
wrong_context.insert("user_id".to_string(), "456".to_string());
wrong_context.insert("operation".to_string(), "write".to_string());
let result = failpoint("test_context", Some(&wrong_context)).await;
let result = await_failpoint_result(failpoint("test_context", Some(&wrong_context))).await;
matches!(result, FailpointResult::Continue);
}
@@ -732,7 +772,9 @@ mod tests {
let result = tokio::task::spawn_blocking(|| {
std::panic::catch_unwind(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async { failpoint("test_panic", None).await })
rt.block_on(async {
await_failpoint_result(failpoint("test_panic", None)).await
})
})
})
.await
@@ -754,7 +796,7 @@ mod tests {
configure_failpoint("test_prob_simple", "100%return(always)").unwrap();
// 100% probability should always trigger
let result = failpoint("test_prob_simple", None).await;
let result = await_failpoint_result(failpoint("test_prob_simple", None)).await;
if let FailpointResult::Return(Some(value)) = result {
assert_eq!(value, "always");
} else {
@@ -763,7 +805,7 @@ mod tests {
// 0% probability should never trigger
configure_failpoint("test_prob_never", "0%return(never)").unwrap();
let result = failpoint("test_prob_never", None).await;
let result = await_failpoint_result(failpoint("test_prob_never", None)).await;
matches!(result, FailpointResult::Continue);
}
@@ -772,14 +814,14 @@ mod tests {
configure_failpoint("test_prob_count", "100%2*return(limited)").unwrap();
// First two calls should trigger (100% probability, max 2 times)
let result1 = failpoint("test_prob_count", None).await;
let result1 = await_failpoint_result(failpoint("test_prob_count", None)).await;
if let FailpointResult::Return(Some(value)) = result1 {
assert_eq!(value, "limited");
} else {
panic!("Expected return result on first call");
}
let result2 = failpoint("test_prob_count", None).await;
let result2 = await_failpoint_result(failpoint("test_prob_count", None)).await;
if let FailpointResult::Return(Some(value)) = result2 {
assert_eq!(value, "limited");
} else {
@@ -787,7 +829,7 @@ mod tests {
}
// Third call should not trigger (count limit exceeded)
let result3 = failpoint("test_prob_count", None).await;
let result3 = await_failpoint_result(failpoint("test_prob_count", None)).await;
matches!(result3, FailpointResult::Continue);
}
@@ -796,7 +838,7 @@ mod tests {
configure_failpoint("test_prob_sleep", "100%sleep(10)").unwrap();
let start = std::time::Instant::now();
let result = failpoint("test_prob_sleep", None).await;
let result = await_failpoint_result(failpoint("test_prob_sleep", None)).await;
let duration = start.elapsed();
matches!(result, FailpointResult::Continue);
@@ -885,7 +927,7 @@ mod tests {
configure_failpoint("test_audit_count", "100%1*return(limited)").unwrap();
// Test the probability-based return
let result = failpoint("test_audit_prob", None).await;
let result = await_failpoint_result(failpoint("test_audit_prob", None)).await;
if let FailpointResult::Return(Some(value)) = result {
assert_eq!(value, "42");
} else {
@@ -893,7 +935,7 @@ mod tests {
}
// Test the count-limited probability
let result1 = failpoint("test_audit_count", None).await;
let result1 = await_failpoint_result(failpoint("test_audit_count", None)).await;
if let FailpointResult::Return(Some(value)) = result1 {
assert_eq!(value, "limited");
} else {
@@ -901,7 +943,7 @@ mod tests {
}
// Second call should not trigger (count limit exceeded)
let result2 = failpoint("test_audit_count", None).await;
let result2 = await_failpoint_result(failpoint("test_audit_count", None)).await;
matches!(result2, FailpointResult::Continue);
}
}

View File

@@ -5,40 +5,141 @@
macro_rules! fail_point {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(None) => {
return;
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::FailpointResult::Return(Some(value)) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::FailpointResult::Cancelled => {},
}
}
}};
($name:literal, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::FailpointResult::Cancelled => {},
}
}
}};
($name:literal, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, None).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}
}};
}
/// Simple failpoint macro - sync version that panics if async action is triggered
#[macro_export]
macro_rules! fail_point_sync {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
}
}
}};
($name:literal, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
}
}
}};
($name:literal, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
$crate::FailpointResult::Cancelled => {},
}
}
}
@@ -50,40 +151,141 @@ macro_rules! fail_point {
macro_rules! fail_point_with_context {
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(None) => {
return;
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::FailpointResult::Return(Some(value)) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::FailpointResult::Cancelled => {},
}
}
}};
($name:literal, $context:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::FailpointResult::Cancelled => {},
}
}
}};
($name:literal, $context:expr, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, Some($context)).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}
}};
}
/// Failpoint macro with context support - sync version
#[macro_export]
macro_rules! fail_point_with_context_sync {
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
}
}
}};
($name:literal, $context:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
}
}
}};
($name:literal, $context:expr, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
$crate::FailpointResult::Cancelled => {},
}
}
}
@@ -101,10 +303,21 @@ macro_rules! pausable_failpoint {
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint_with_cancellation($name, None, $cancel).await {
$crate::FailpointResult::Continue => Ok(()),
$crate::FailpointResult::Return(_) => Ok(()),
$crate::FailpointResult::Cancelled => Err(()),
match $crate::failpoint_with_cancellation($name, None, $cancel) {
either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => Ok(()),
$crate::FailpointResult::Return(_) => Ok(()),
$crate::FailpointResult::Cancelled => Err(()),
}
},
either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => Ok(()),
$crate::FailpointResult::Return(_) => Ok(()),
$crate::FailpointResult::Cancelled => Err(()),
}
},
}
} else {
Ok(())
@@ -117,12 +330,22 @@ macro_rules! pausable_failpoint {
macro_rules! sleep_millis_async {
($name:literal) => {{
if cfg!(feature = "testing") {
$crate::failpoint($name, None).await;
match $crate::failpoint($name, None) {
either::Either::Left(_) => {},
either::Either::Right(future) => {
future.await;
},
}
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
$crate::failpoint_with_cancellation($name, None, $cancel).await;
match $crate::failpoint_with_cancellation($name, None, $cancel) {
either::Either::Left(_) => {},
either::Either::Right(future) => {
future.await;
},
}
}
}};
}
@@ -144,17 +367,66 @@ macro_rules! failpoint_context {
macro_rules! failpoint_return {
($name:literal) => {{
if cfg!(feature = "testing") {
if let $crate::FailpointResult::Return(value) = $crate::failpoint($name, None).await {
return value.parse().unwrap_or_default();
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
if let $crate::FailpointResult::Return(Some(value)) = result {
return value.parse().unwrap_or_default();
}
},
either::Either::Right(future) => {
if let $crate::FailpointResult::Return(Some(value)) = future.await {
return value.parse().unwrap_or_default();
}
},
}
}
}};
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
if let $crate::FailpointResult::Return(value) =
$crate::failpoint($name, Some($context)).await
{
return value.parse().unwrap_or_default();
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
if let $crate::FailpointResult::Return(Some(value)) = result {
return value.parse().unwrap_or_default();
}
},
either::Either::Right(future) => {
if let $crate::FailpointResult::Return(Some(value)) = future.await {
return value.parse().unwrap_or_default();
}
},
}
}
}};
}
/// Macro for simple failpoint calls that might return early - sync version
#[macro_export]
macro_rules! failpoint_return_sync {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
if let $crate::FailpointResult::Return(Some(value)) = result {
return value.parse().unwrap_or_default();
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but failpoint_return_sync! was used. Use failpoint_return! instead.", $name);
},
}
}
}};
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
if let $crate::FailpointResult::Return(Some(value)) = result {
return value.parse().unwrap_or_default();
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but failpoint_return_sync! was used. Use failpoint_return! instead.", $name);
},
}
}
}};
@@ -165,17 +437,66 @@ macro_rules! failpoint_return {
macro_rules! failpoint_bail {
($name:literal, $error_msg:literal) => {{
if cfg!(feature = "testing") {
if let $crate::FailpointResult::Return(_) = $crate::failpoint($name, None).await {
anyhow::bail!($error_msg);
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
if let $crate::FailpointResult::Return(_) = result {
anyhow::bail!($error_msg);
}
},
either::Either::Right(future) => {
if let $crate::FailpointResult::Return(_) = future.await {
anyhow::bail!($error_msg);
}
},
}
}
}};
($name:literal, $context:expr, $error_msg:literal) => {{
if cfg!(feature = "testing") {
if let $crate::FailpointResult::Return(_) =
$crate::failpoint($name, Some($context)).await
{
anyhow::bail!($error_msg);
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
if let $crate::FailpointResult::Return(_) = result {
anyhow::bail!($error_msg);
}
},
either::Either::Right(future) => {
if let $crate::FailpointResult::Return(_) = future.await {
anyhow::bail!($error_msg);
}
},
}
}
}};
}
/// Macro for failpoint calls that might bail with an error - sync version
#[macro_export]
macro_rules! failpoint_bail_sync {
($name:literal, $error_msg:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
either::Either::Left(result) => {
if let $crate::FailpointResult::Return(_) = result {
anyhow::bail!($error_msg);
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but failpoint_bail_sync! was used. Use failpoint_bail! instead.", $name);
},
}
}
}};
($name:literal, $context:expr, $error_msg:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
either::Either::Left(result) => {
if let $crate::FailpointResult::Return(_) = result {
anyhow::bail!($error_msg);
}
},
either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but failpoint_bail_sync! was used. Use failpoint_bail! instead.", $name);
},
}
}
}};
@@ -183,9 +504,13 @@ macro_rules! failpoint_bail {
// Re-export for convenience
pub use fail_point;
pub use fail_point_sync;
pub use fail_point_with_context;
pub use fail_point_with_context_sync;
pub use failpoint_bail;
pub use failpoint_bail_sync;
pub use failpoint_context;
pub use failpoint_return;
pub use failpoint_return_sync;
pub use pausable_failpoint;
pub use sleep_millis_async;

View File

@@ -21,6 +21,7 @@ bytes.workspace = true
camino.workspace = true
chrono.workspace = true
diatomic-waker.workspace = true
either.workspace = true
git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true

View File

@@ -5,38 +5,17 @@
pub use neon_failpoint::{configure_failpoint as apply_failpoint, has_failpoints, init};
use tokio_util::sync::CancellationToken;
/// Declare a failpoint that can use to `pause` failpoint action.
/// This is now a compatibility wrapper around the new neon_failpoint crate.
///
/// Optionally pass a cancellation token, and this failpoint will drop out of
/// its pause when the cancellation token fires. This is useful for testing
/// cases where we would like to block something, but test its clean shutdown behavior.
/// The macro evaluates to a Result in that case, where Ok(()) is the case
/// where the failpoint was not paused, and Err() is the case where cancellation
/// token fired while evaluating the failpoint.
/// Mere forward to neon_failpoint::pausable_failpoint
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {{
if cfg!(feature = "testing") {
let cancel = ::tokio_util::sync::CancellationToken::new();
let _ = $crate::pausable_failpoint!($name, &cancel);
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
match ::neon_failpoint::failpoint_with_cancellation($name, None, $cancel).await {
::neon_failpoint::FailpointResult::Continue => Ok(()),
::neon_failpoint::FailpointResult::Return(_) => Ok(()),
::neon_failpoint::FailpointResult::Cancelled => Err(()),
}
} else {
Ok(())
}
}};
($name:literal) => {
::neon_failpoint::pausable_failpoint!($name)
};
($name:literal, $cancel:expr) => {
::neon_failpoint::pausable_failpoint!($name, $cancel)
};
}
pub use pausable_failpoint;
/// use with neon_failpoint::configure_failpoint("$name", "sleep(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the

View File

@@ -131,7 +131,7 @@ pub(super) async fn flush_metrics_to_disk(
tempfile.flush()?;
tempfile.as_file().sync_all()?;
fail::fail_point!("before-persist-last-metrics-collected");
fail::fail_point_sync!("before-persist-last-metrics-collected");
drop(tempfile.persist(&*path).map_err(|e| e.error)?);

View File

@@ -336,7 +336,7 @@ async fn page_service_conn_main(
let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
let socket_timeout_ms = (|| {
fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
fail::fail_point_sync!("simulated-bad-compute-connection", |avg_timeout_ms| {
// Exponential distribution for simulating
// poor network conditions, expect about avg_timeout_ms to be around 15
// in tests
@@ -3044,7 +3044,7 @@ where
_pgb: &mut PostgresBackend<IO>,
sm: &FeStartupPacket,
) -> Result<(), QueryError> {
fail::fail_point!("ps::connection-start::startup-packet");
fail::fail_point_sync!("ps::connection-start::startup-packet");
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {

View File

@@ -5184,7 +5184,7 @@ impl Timeline {
*self.applied_gc_cutoff_lsn.read(),
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
neon_failpoint::fail_point_sync!("checkpoint-before-saving-metadata", |x| bail!(
"{}",
x.unwrap()
));

View File

@@ -1114,7 +1114,7 @@ pub(super) async fn detach_and_reparent(
// others will fail as if those timelines had been stopped for whatever reason.
#[cfg(feature = "testing")]
let failpoint_sem = || -> Option<Arc<Semaphore>> {
fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
fail::fail_point_sync!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
Arc::new(Semaphore::new(1))
));
None