diff --git a/Cargo.lock b/Cargo.lock index fe30d5246f..af51678343 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3857,6 +3857,7 @@ name = "neon_failpoint" version = "0.1.0" dependencies = [ "anyhow", + "either", "once_cell", "parking_lot 0.12.1", "rand 0.8.5", @@ -8180,6 +8181,7 @@ dependencies = [ "const_format", "criterion", "diatomic-waker", + "either", "futures", "git-version", "hex", diff --git a/libs/neon_failpoint/Cargo.toml b/libs/neon_failpoint/Cargo.toml index 9936d72a1a..bfb8e33666 100644 --- a/libs/neon_failpoint/Cargo.toml +++ b/libs/neon_failpoint/Cargo.toml @@ -13,6 +13,7 @@ regex = { workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } +either = { workspace = true } [dev-dependencies] tracing-subscriber = { workspace = true, features = ["fmt"] } @@ -23,4 +24,4 @@ testing = [] [[example]] name = "context_demo" -required-features = ["testing"] \ No newline at end of file +required-features = ["testing"] \ No newline at end of file diff --git a/libs/neon_failpoint/src/lib.rs b/libs/neon_failpoint/src/lib.rs index 08e72f08c8..eea54cc1cf 100644 --- a/libs/neon_failpoint/src/lib.rs +++ b/libs/neon_failpoint/src/lib.rs @@ -9,8 +9,11 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use std::pin::Pin; +use std::future::Future; use anyhow::Result; +use either::Either; use once_cell::sync::Lazy; use parking_lot::RwLock; use rand::Rng; @@ -170,19 +173,19 @@ pub fn has_failpoints() -> bool { } /// Execute a failpoint with optional context -pub async fn failpoint(name: &str, context: Option<&FailpointContext>) -> FailpointResult { - failpoint_with_cancellation(name, context, &CancellationToken::new()).await +pub fn failpoint(name: &str, context: Option<&FailpointContext>) -> Either + Send>>> { + failpoint_with_cancellation(name, context, &CancellationToken::new()) } /// Execute a failpoint with cancellation support -pub async fn failpoint_with_cancellation( +pub fn failpoint_with_cancellation( name: &str, context: Option<&FailpointContext>, cancel_token: &CancellationToken, -) -> FailpointResult { +) -> Either + Send>>> { // Only check failpoints if testing feature is enabled if !cfg!(feature = "testing") { - return FailpointResult::Continue; + return Either::Left(FailpointResult::Continue); } let config = { @@ -191,13 +194,13 @@ pub async fn failpoint_with_cancellation( }; let Some(config) = config else { - return FailpointResult::Continue; + return Either::Left(FailpointResult::Continue); }; // Check context matchers if provided if let (Some(matchers), Some(ctx)) = (&config.context_matchers, context) { if !matches_context(matchers, ctx) { - return FailpointResult::Continue; + return Either::Left(FailpointResult::Continue); } } @@ -206,7 +209,7 @@ pub async fn failpoint_with_cancellation( // Check if we've hit the max count if let Some(max_count) = config.action_spec.max_count { if config.trigger_count >= max_count { - return FailpointResult::Continue; + return Either::Left(FailpointResult::Continue); } } @@ -214,7 +217,7 @@ pub async fn failpoint_with_cancellation( let mut rng = rand::thread_rng(); let roll: u8 = rng.gen_range(1..=100); if roll > probability { - return FailpointResult::Continue; + return Either::Left(FailpointResult::Continue); } // Increment trigger count @@ -228,24 +231,36 @@ pub async fn failpoint_with_cancellation( tracing::info!("Hit failpoint: {}", name); - execute_action(name, &config.action_spec, context, cancel_token).await + execute_action(name, &config.action_spec, context, cancel_token) } /// Execute a specific action (used for recursive execution in probability-based actions) -fn execute_action<'a>( - name: &'a str, - action_spec: &'a FailpointActionSpec, - _context: Option<&'a FailpointContext>, - cancel_token: &'a CancellationToken, -) -> std::pin::Pin + Send + 'a>> { - Box::pin(async move { - match &action_spec.action { - FailpointAction::Off => FailpointResult::Continue, - FailpointAction::Return(value) => { - tracing::info!("Failpoint {} returning: {:?}", name, value); - FailpointResult::Return(value.clone()) - } - FailpointAction::Sleep(millis) => { +fn execute_action( + name: &str, + action_spec: &FailpointActionSpec, + _context: Option<&FailpointContext>, + cancel_token: &CancellationToken, +) -> Either + Send>>> { + match &action_spec.action { + FailpointAction::Off => Either::Left(FailpointResult::Continue), + FailpointAction::Return(value) => { + tracing::info!("Failpoint {} returning: {:?}", name, value); + Either::Left(FailpointResult::Return(value.clone())) + } + FailpointAction::Exit => { + tracing::info!("Failpoint {} exiting process", name); + std::process::exit(1); + } + FailpointAction::Panic(message) => { + tracing::error!("Failpoint {} panicking with message: {}", name, message); + panic!("Failpoint panicked: {}", message); + } + FailpointAction::Sleep(millis) => { + let millis = *millis; + let name = name.to_string(); + let cancel_token = cancel_token.clone(); + + Either::Right(Box::pin(async move { tracing::info!("Failpoint {} sleeping for {}ms", name, millis); // Create a notifier for this task @@ -254,20 +269,20 @@ fn execute_action<'a>( // Add the notifier to the failpoint configuration { let mut failpoints = FAILPOINTS.write(); - if let Some(fp_config) = failpoints.get_mut(name) { + if let Some(fp_config) = failpoints.get_mut(&name) { fp_config.notifiers.push(notifier.clone()); } } // Create cleanup guard to remove notifier when done let _guard = NotifierCleanupGuard { - failpoint_name: name.to_string(), + failpoint_name: name.clone(), notifier: notifier.clone(), }; // Sleep with cancellation support tokio::select! { - _ = tokio::time::sleep(Duration::from_millis(*millis)) => { + _ = tokio::time::sleep(Duration::from_millis(millis)) => { tracing::info!("Failpoint {} finished sleeping", name); FailpointResult::Continue } @@ -280,8 +295,13 @@ fn execute_action<'a>( FailpointResult::Continue } } - } - FailpointAction::Pause => { + })) + } + FailpointAction::Pause => { + let name = name.to_string(); + let cancel_token = cancel_token.clone(); + + Either::Right(Box::pin(async move { tracing::info!("Failpoint {} pausing", name); // Create a notifier for this task @@ -290,14 +310,14 @@ fn execute_action<'a>( // Add the notifier to the failpoint configuration { let mut failpoints = FAILPOINTS.write(); - if let Some(fp_config) = failpoints.get_mut(name) { + if let Some(fp_config) = failpoints.get_mut(&name) { fp_config.notifiers.push(notifier.clone()); } } // Create cleanup guard to remove notifier when done let _guard = NotifierCleanupGuard { - failpoint_name: name.to_string(), + failpoint_name: name.clone(), notifier: notifier.clone(), }; @@ -312,17 +332,9 @@ fn execute_action<'a>( FailpointResult::Continue } } - } - FailpointAction::Exit => { - tracing::info!("Failpoint {} exiting process", name); - std::process::exit(1); - } - FailpointAction::Panic(message) => { - tracing::error!("Failpoint {} panicking with message: {}", name, message); - panic!("Failpoint panicked: {}", message); - } + })) } - }) + } } /// Parse an action string into a FailpointActionSpec @@ -439,17 +451,27 @@ mod tests { use super::*; use tokio::time::timeout; + // Helper function to await either sync or async failpoint results + async fn await_failpoint_result( + either: Either + Send>>>, + ) -> FailpointResult { + match either { + Either::Left(result) => result, + Either::Right(future) => future.await, + } + } + #[tokio::test] async fn test_failpoint_off() { configure_failpoint("test_off", "off").unwrap(); - let result = failpoint("test_off", None).await; + let result = await_failpoint_result(failpoint("test_off", None)).await; matches!(result, FailpointResult::Continue); } #[tokio::test] async fn test_failpoint_return() { configure_failpoint("test_return", "return(42)").unwrap(); - let result = failpoint("test_return", None).await; + let result = await_failpoint_result(failpoint("test_return", None)).await; if let FailpointResult::Return(Some(value)) = result { assert_eq!(value, "42"); } else { @@ -461,7 +483,7 @@ mod tests { async fn test_failpoint_sleep() { configure_failpoint("test_sleep", "sleep(10)").unwrap(); let start = std::time::Instant::now(); - let result = failpoint("test_sleep", None).await; + let result = await_failpoint_result(failpoint("test_sleep", None)).await; let duration = start.elapsed(); matches!(result, FailpointResult::Continue); @@ -480,7 +502,7 @@ mod tests { cancel_token_clone.cancel(); }); - let result = failpoint_with_cancellation("test_pause", None, &cancel_token).await; + let result = await_failpoint_result(failpoint_with_cancellation("test_pause", None, &cancel_token)).await; matches!(result, FailpointResult::Cancelled); } @@ -492,7 +514,9 @@ mod tests { let start = std::time::Instant::now(); // Start a task that hits the failpoint - let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); // Give it time to hit the failpoint and pause tokio::time::sleep(Duration::from_millis(10)).await; @@ -522,7 +546,9 @@ mod tests { configure_failpoint(failpoint_name, "pause").unwrap(); // Start a task that hits the failpoint - let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); // Give it time to hit the failpoint and pause tokio::time::sleep(Duration::from_millis(10)).await; @@ -548,7 +574,9 @@ mod tests { let start = std::time::Instant::now(); // Start a task that hits the failpoint - let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); // Give it time to hit the failpoint and start sleeping tokio::time::sleep(Duration::from_millis(10)).await; @@ -579,7 +607,9 @@ mod tests { let start = std::time::Instant::now(); // Start a task that hits the failpoint - let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); // Give it time to hit the failpoint and start sleeping tokio::time::sleep(Duration::from_millis(10)).await; @@ -608,7 +638,9 @@ mod tests { configure_failpoint(failpoint_name, "pause").unwrap(); // Start a task that hits the failpoint - let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); // Give it time to hit the failpoint and pause tokio::time::sleep(Duration::from_millis(10)).await; @@ -632,11 +664,17 @@ mod tests { configure_failpoint(failpoint_name, "pause").unwrap(); // Start multiple tasks that hit the same failpoint - let task1 = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task1 = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); - let task2 = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task2 = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); - let task3 = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task3 = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); // Give them time to hit the failpoint and pause tokio::time::sleep(Duration::from_millis(10)).await; @@ -672,7 +710,9 @@ mod tests { configure_failpoint(failpoint_name, "pause").unwrap(); // Start a task that will hit the failpoint - let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + let task = tokio::spawn(async move { + await_failpoint_result(failpoint(failpoint_name, None)).await + }); // Rapidly change the configuration multiple times for i in 0..10 { @@ -709,7 +749,7 @@ mod tests { configure_failpoint_with_context("test_context", "return(matched)", matchers).unwrap(); - let result = failpoint("test_context", Some(&context)).await; + let result = await_failpoint_result(failpoint("test_context", Some(&context))).await; if let FailpointResult::Return(Some(value)) = result { assert_eq!(value, "matched"); } else { @@ -721,7 +761,7 @@ mod tests { wrong_context.insert("user_id".to_string(), "456".to_string()); wrong_context.insert("operation".to_string(), "write".to_string()); - let result = failpoint("test_context", Some(&wrong_context)).await; + let result = await_failpoint_result(failpoint("test_context", Some(&wrong_context))).await; matches!(result, FailpointResult::Continue); } @@ -732,7 +772,9 @@ mod tests { let result = tokio::task::spawn_blocking(|| { std::panic::catch_unwind(|| { let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { failpoint("test_panic", None).await }) + rt.block_on(async { + await_failpoint_result(failpoint("test_panic", None)).await + }) }) }) .await @@ -754,7 +796,7 @@ mod tests { configure_failpoint("test_prob_simple", "100%return(always)").unwrap(); // 100% probability should always trigger - let result = failpoint("test_prob_simple", None).await; + let result = await_failpoint_result(failpoint("test_prob_simple", None)).await; if let FailpointResult::Return(Some(value)) = result { assert_eq!(value, "always"); } else { @@ -763,7 +805,7 @@ mod tests { // 0% probability should never trigger configure_failpoint("test_prob_never", "0%return(never)").unwrap(); - let result = failpoint("test_prob_never", None).await; + let result = await_failpoint_result(failpoint("test_prob_never", None)).await; matches!(result, FailpointResult::Continue); } @@ -772,14 +814,14 @@ mod tests { configure_failpoint("test_prob_count", "100%2*return(limited)").unwrap(); // First two calls should trigger (100% probability, max 2 times) - let result1 = failpoint("test_prob_count", None).await; + let result1 = await_failpoint_result(failpoint("test_prob_count", None)).await; if let FailpointResult::Return(Some(value)) = result1 { assert_eq!(value, "limited"); } else { panic!("Expected return result on first call"); } - let result2 = failpoint("test_prob_count", None).await; + let result2 = await_failpoint_result(failpoint("test_prob_count", None)).await; if let FailpointResult::Return(Some(value)) = result2 { assert_eq!(value, "limited"); } else { @@ -787,7 +829,7 @@ mod tests { } // Third call should not trigger (count limit exceeded) - let result3 = failpoint("test_prob_count", None).await; + let result3 = await_failpoint_result(failpoint("test_prob_count", None)).await; matches!(result3, FailpointResult::Continue); } @@ -796,7 +838,7 @@ mod tests { configure_failpoint("test_prob_sleep", "100%sleep(10)").unwrap(); let start = std::time::Instant::now(); - let result = failpoint("test_prob_sleep", None).await; + let result = await_failpoint_result(failpoint("test_prob_sleep", None)).await; let duration = start.elapsed(); matches!(result, FailpointResult::Continue); @@ -885,7 +927,7 @@ mod tests { configure_failpoint("test_audit_count", "100%1*return(limited)").unwrap(); // Test the probability-based return - let result = failpoint("test_audit_prob", None).await; + let result = await_failpoint_result(failpoint("test_audit_prob", None)).await; if let FailpointResult::Return(Some(value)) = result { assert_eq!(value, "42"); } else { @@ -893,7 +935,7 @@ mod tests { } // Test the count-limited probability - let result1 = failpoint("test_audit_count", None).await; + let result1 = await_failpoint_result(failpoint("test_audit_count", None)).await; if let FailpointResult::Return(Some(value)) = result1 { assert_eq!(value, "limited"); } else { @@ -901,7 +943,7 @@ mod tests { } // Second call should not trigger (count limit exceeded) - let result2 = failpoint("test_audit_count", None).await; + let result2 = await_failpoint_result(failpoint("test_audit_count", None)).await; matches!(result2, FailpointResult::Continue); } } diff --git a/libs/neon_failpoint/src/macros.rs b/libs/neon_failpoint/src/macros.rs index 4db7445147..e3f093e7e3 100644 --- a/libs/neon_failpoint/src/macros.rs +++ b/libs/neon_failpoint/src/macros.rs @@ -5,40 +5,141 @@ macro_rules! fail_point { ($name:literal) => {{ if cfg!(feature = "testing") { - match $crate::failpoint($name, None).await { - $crate::FailpointResult::Continue => {}, - $crate::FailpointResult::Return(None) => { - return; + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(_) => { + panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value"); + }, + $crate::FailpointResult::Cancelled => {}, + } }, - $crate::FailpointResult::Return(Some(value)) => { - panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value"); + either::Either::Right(future) => { + match future.await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(_) => { + panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value"); + }, + $crate::FailpointResult::Cancelled => {}, + } }, - $crate::FailpointResult::Cancelled => {}, } } }}; ($name:literal, $closure:expr) => {{ if cfg!(feature = "testing") { - match $crate::failpoint($name, None).await { - $crate::FailpointResult::Continue => {}, - $crate::FailpointResult::Return(value) => { - let closure = $closure; - return closure(value); + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(future) => { + match future.await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } }, - $crate::FailpointResult::Cancelled => {}, } } }}; ($name:literal, $condition:expr, $closure:expr) => {{ if cfg!(feature = "testing") { if $condition { - match $crate::failpoint($name, None).await { - $crate::FailpointResult::Continue => {}, - $crate::FailpointResult::Return(value) => { - let closure = $closure; - return closure(value); + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(future) => { + match future.await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + } + } + } + }}; +} + +/// Simple failpoint macro - sync version that panics if async action is triggered +#[macro_export] +macro_rules! fail_point_sync { + ($name:literal) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(_) => { + panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value"); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name); + }, + } + } + }}; + ($name:literal, $closure:expr) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name); + }, + } + } + }}; + ($name:literal, $condition:expr, $closure:expr) => {{ + if cfg!(feature = "testing") { + if $condition { + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name); }, - $crate::FailpointResult::Cancelled => {}, } } } @@ -50,40 +151,141 @@ macro_rules! fail_point { macro_rules! fail_point_with_context { ($name:literal, $context:expr) => {{ if cfg!(feature = "testing") { - match $crate::failpoint($name, Some($context)).await { - $crate::FailpointResult::Continue => {}, - $crate::FailpointResult::Return(None) => { - return; + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(_) => { + panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value"); + }, + $crate::FailpointResult::Cancelled => {}, + } }, - $crate::FailpointResult::Return(Some(value)) => { - panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value"); + either::Either::Right(future) => { + match future.await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(_) => { + panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value"); + }, + $crate::FailpointResult::Cancelled => {}, + } }, - $crate::FailpointResult::Cancelled => {}, } } }}; ($name:literal, $context:expr, $closure:expr) => {{ if cfg!(feature = "testing") { - match $crate::failpoint($name, Some($context)).await { - $crate::FailpointResult::Continue => {}, - $crate::FailpointResult::Return(value) => { - let closure = $closure; - return closure(value); + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(future) => { + match future.await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } }, - $crate::FailpointResult::Cancelled => {}, } } }}; ($name:literal, $context:expr, $condition:expr, $closure:expr) => {{ if cfg!(feature = "testing") { if $condition { - match $crate::failpoint($name, Some($context)).await { - $crate::FailpointResult::Continue => {}, - $crate::FailpointResult::Return(value) => { - let closure = $closure; - return closure(value); + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(future) => { + match future.await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + } + } + } + }}; +} + +/// Failpoint macro with context support - sync version +#[macro_export] +macro_rules! fail_point_with_context_sync { + ($name:literal, $context:expr) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(_) => { + panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value"); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name); + }, + } + } + }}; + ($name:literal, $context:expr, $closure:expr) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name); + }, + } + } + }}; + ($name:literal, $context:expr, $condition:expr, $closure:expr) => {{ + if cfg!(feature = "testing") { + if $condition { + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let closure = $closure; + return closure(value); + }, + $crate::FailpointResult::Cancelled => {}, + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name); }, - $crate::FailpointResult::Cancelled => {}, } } } @@ -101,10 +303,21 @@ macro_rules! pausable_failpoint { }}; ($name:literal, $cancel:expr) => {{ if cfg!(feature = "testing") { - match $crate::failpoint_with_cancellation($name, None, $cancel).await { - $crate::FailpointResult::Continue => Ok(()), - $crate::FailpointResult::Return(_) => Ok(()), - $crate::FailpointResult::Cancelled => Err(()), + match $crate::failpoint_with_cancellation($name, None, $cancel) { + either::Either::Left(result) => { + match result { + $crate::FailpointResult::Continue => Ok(()), + $crate::FailpointResult::Return(_) => Ok(()), + $crate::FailpointResult::Cancelled => Err(()), + } + }, + either::Either::Right(future) => { + match future.await { + $crate::FailpointResult::Continue => Ok(()), + $crate::FailpointResult::Return(_) => Ok(()), + $crate::FailpointResult::Cancelled => Err(()), + } + }, } } else { Ok(()) @@ -117,12 +330,22 @@ macro_rules! pausable_failpoint { macro_rules! sleep_millis_async { ($name:literal) => {{ if cfg!(feature = "testing") { - $crate::failpoint($name, None).await; + match $crate::failpoint($name, None) { + either::Either::Left(_) => {}, + either::Either::Right(future) => { + future.await; + }, + } } }}; ($name:literal, $cancel:expr) => {{ if cfg!(feature = "testing") { - $crate::failpoint_with_cancellation($name, None, $cancel).await; + match $crate::failpoint_with_cancellation($name, None, $cancel) { + either::Either::Left(_) => {}, + either::Either::Right(future) => { + future.await; + }, + } } }}; } @@ -144,17 +367,66 @@ macro_rules! failpoint_context { macro_rules! failpoint_return { ($name:literal) => {{ if cfg!(feature = "testing") { - if let $crate::FailpointResult::Return(value) = $crate::failpoint($name, None).await { - return value.parse().unwrap_or_default(); + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + if let $crate::FailpointResult::Return(Some(value)) = result { + return value.parse().unwrap_or_default(); + } + }, + either::Either::Right(future) => { + if let $crate::FailpointResult::Return(Some(value)) = future.await { + return value.parse().unwrap_or_default(); + } + }, } } }}; ($name:literal, $context:expr) => {{ if cfg!(feature = "testing") { - if let $crate::FailpointResult::Return(value) = - $crate::failpoint($name, Some($context)).await - { - return value.parse().unwrap_or_default(); + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + if let $crate::FailpointResult::Return(Some(value)) = result { + return value.parse().unwrap_or_default(); + } + }, + either::Either::Right(future) => { + if let $crate::FailpointResult::Return(Some(value)) = future.await { + return value.parse().unwrap_or_default(); + } + }, + } + } + }}; +} + +/// Macro for simple failpoint calls that might return early - sync version +#[macro_export] +macro_rules! failpoint_return_sync { + ($name:literal) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + if let $crate::FailpointResult::Return(Some(value)) = result { + return value.parse().unwrap_or_default(); + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but failpoint_return_sync! was used. Use failpoint_return! instead.", $name); + }, + } + } + }}; + ($name:literal, $context:expr) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + if let $crate::FailpointResult::Return(Some(value)) = result { + return value.parse().unwrap_or_default(); + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but failpoint_return_sync! was used. Use failpoint_return! instead.", $name); + }, } } }}; @@ -165,17 +437,66 @@ macro_rules! failpoint_return { macro_rules! failpoint_bail { ($name:literal, $error_msg:literal) => {{ if cfg!(feature = "testing") { - if let $crate::FailpointResult::Return(_) = $crate::failpoint($name, None).await { - anyhow::bail!($error_msg); + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + if let $crate::FailpointResult::Return(_) = result { + anyhow::bail!($error_msg); + } + }, + either::Either::Right(future) => { + if let $crate::FailpointResult::Return(_) = future.await { + anyhow::bail!($error_msg); + } + }, } } }}; ($name:literal, $context:expr, $error_msg:literal) => {{ if cfg!(feature = "testing") { - if let $crate::FailpointResult::Return(_) = - $crate::failpoint($name, Some($context)).await - { - anyhow::bail!($error_msg); + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + if let $crate::FailpointResult::Return(_) = result { + anyhow::bail!($error_msg); + } + }, + either::Either::Right(future) => { + if let $crate::FailpointResult::Return(_) = future.await { + anyhow::bail!($error_msg); + } + }, + } + } + }}; +} + +/// Macro for failpoint calls that might bail with an error - sync version +#[macro_export] +macro_rules! failpoint_bail_sync { + ($name:literal, $error_msg:literal) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, None) { + either::Either::Left(result) => { + if let $crate::FailpointResult::Return(_) = result { + anyhow::bail!($error_msg); + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but failpoint_bail_sync! was used. Use failpoint_bail! instead.", $name); + }, + } + } + }}; + ($name:literal, $context:expr, $error_msg:literal) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, Some($context)) { + either::Either::Left(result) => { + if let $crate::FailpointResult::Return(_) = result { + anyhow::bail!($error_msg); + } + }, + either::Either::Right(_) => { + panic!("failpoint '{}' triggered an async action (sleep/pause) but failpoint_bail_sync! was used. Use failpoint_bail! instead.", $name); + }, } } }}; @@ -183,9 +504,13 @@ macro_rules! failpoint_bail { // Re-export for convenience pub use fail_point; +pub use fail_point_sync; pub use fail_point_with_context; +pub use fail_point_with_context_sync; pub use failpoint_bail; +pub use failpoint_bail_sync; pub use failpoint_context; pub use failpoint_return; +pub use failpoint_return_sync; pub use pausable_failpoint; pub use sleep_millis_async; diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 6dbaa30fa1..ce4c292c4b 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -21,6 +21,7 @@ bytes.workspace = true camino.workspace = true chrono.workspace = true diatomic-waker.workspace = true +either.workspace = true git-version.workspace = true hex = { workspace = true, features = ["serde"] } humantime.workspace = true diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index b0dc84cb8e..c93490a97a 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -5,38 +5,17 @@ pub use neon_failpoint::{configure_failpoint as apply_failpoint, has_failpoints, init}; use tokio_util::sync::CancellationToken; -/// Declare a failpoint that can use to `pause` failpoint action. -/// This is now a compatibility wrapper around the new neon_failpoint crate. -/// -/// Optionally pass a cancellation token, and this failpoint will drop out of -/// its pause when the cancellation token fires. This is useful for testing -/// cases where we would like to block something, but test its clean shutdown behavior. -/// The macro evaluates to a Result in that case, where Ok(()) is the case -/// where the failpoint was not paused, and Err() is the case where cancellation -/// token fired while evaluating the failpoint. +/// Mere forward to neon_failpoint::pausable_failpoint #[macro_export] macro_rules! pausable_failpoint { - ($name:literal) => {{ - if cfg!(feature = "testing") { - let cancel = ::tokio_util::sync::CancellationToken::new(); - let _ = $crate::pausable_failpoint!($name, &cancel); - } - }}; - ($name:literal, $cancel:expr) => {{ - if cfg!(feature = "testing") { - match ::neon_failpoint::failpoint_with_cancellation($name, None, $cancel).await { - ::neon_failpoint::FailpointResult::Continue => Ok(()), - ::neon_failpoint::FailpointResult::Return(_) => Ok(()), - ::neon_failpoint::FailpointResult::Cancelled => Err(()), - } - } else { - Ok(()) - } - }}; + ($name:literal) => { + ::neon_failpoint::pausable_failpoint!($name) + }; + ($name:literal, $cancel:expr) => { + ::neon_failpoint::pausable_failpoint!($name, $cancel) + }; } -pub use pausable_failpoint; - /// use with neon_failpoint::configure_failpoint("$name", "sleep(2000)") /// /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the diff --git a/pageserver/src/consumption_metrics/disk_cache.rs b/pageserver/src/consumption_metrics/disk_cache.rs index 49e3a02459..14774fcc3d 100644 --- a/pageserver/src/consumption_metrics/disk_cache.rs +++ b/pageserver/src/consumption_metrics/disk_cache.rs @@ -131,7 +131,7 @@ pub(super) async fn flush_metrics_to_disk( tempfile.flush()?; tempfile.as_file().sync_all()?; - fail::fail_point!("before-persist-last-metrics-collected"); + fail::fail_point_sync!("before-persist-last-metrics-collected"); drop(tempfile.persist(&*path).map_err(|e| e.error)?); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index c1443a67b0..eb01fea095 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -336,7 +336,7 @@ async fn page_service_conn_main( let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default let socket_timeout_ms = (|| { - fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| { + fail::fail_point_sync!("simulated-bad-compute-connection", |avg_timeout_ms| { // Exponential distribution for simulating // poor network conditions, expect about avg_timeout_ms to be around 15 // in tests @@ -3044,7 +3044,7 @@ where _pgb: &mut PostgresBackend, sm: &FeStartupPacket, ) -> Result<(), QueryError> { - fail::fail_point!("ps::connection-start::startup-packet"); + fail::fail_point_sync!("ps::connection-start::startup-packet"); if let FeStartupPacket::StartupMessage { params, .. } = sm { if let Some(app_name) = params.get("application_name") { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8db15afabf..3cd34fc1f1 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -5184,7 +5184,7 @@ impl Timeline { *self.applied_gc_cutoff_lsn.read(), ); - fail_point!("checkpoint-before-saving-metadata", |x| bail!( + neon_failpoint::fail_point_sync!("checkpoint-before-saving-metadata", |x| bail!( "{}", x.unwrap() )); diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index f89671e86f..f6bf84b710 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -1114,7 +1114,7 @@ pub(super) async fn detach_and_reparent( // others will fail as if those timelines had been stopped for whatever reason. #[cfg(feature = "testing")] let failpoint_sem = || -> Option> { - fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some( + fail::fail_point_sync!("timeline-detach-ancestor::allow_one_reparented", |_| Some( Arc::new(Semaphore::new(1)) )); None