From 415cdff336c3108aef24173e74aadf099a5837da Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Jul 2025 14:00:23 +0200 Subject: [PATCH] audit for failpoint usage, implement missing ops --- Cargo.lock | 1 + libs/neon_failpoint/Cargo.toml | 1 + libs/neon_failpoint/README.md | 22 + libs/neon_failpoint/examples/context_demo.rs | 4 +- libs/neon_failpoint/src/lib.rs | 526 ++++++++++++++----- 5 files changed, 427 insertions(+), 127 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37baf64048..fe30d5246f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3859,6 +3859,7 @@ dependencies = [ "anyhow", "once_cell", "parking_lot 0.12.1", + "rand 0.8.5", "regex", "serde", "tokio", diff --git a/libs/neon_failpoint/Cargo.toml b/libs/neon_failpoint/Cargo.toml index b283f232e4..9936d72a1a 100644 --- a/libs/neon_failpoint/Cargo.toml +++ b/libs/neon_failpoint/Cargo.toml @@ -12,6 +12,7 @@ anyhow = { workspace = true } regex = { workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true } +rand = { workspace = true } [dev-dependencies] tracing-subscriber = { workspace = true, features = ["fmt"] } diff --git a/libs/neon_failpoint/README.md b/libs/neon_failpoint/README.md index 905e2bf310..8f525e7ab8 100644 --- a/libs/neon_failpoint/README.md +++ b/libs/neon_failpoint/README.md @@ -19,6 +19,28 @@ A modern, async-first failpoint library for Neon, replacing the `fail` crate wit - `return` - Return early (empty value) - `return(value)` - Return early with a specific value - `exit` - Exit the process immediately +- `panic(message)` - Panic the process with a custom message +- `N%return(value)` - Return with a specific value N% of the time (probability-based) +- `N%M*return(value)` - Return with a specific value N% of the time, maximum M times +- `N%action` - Execute any action N% of the time (probability-based) +- `N%M*action` - Execute any action N% of the time, maximum M times + +## Probability-Based Actions + +The library supports probability-based failpoints that trigger only a percentage of the time: + +```rust +// 50% chance to return a value +configure_failpoint("random_failure", "50%return(error)").unwrap(); + +// 10% chance to sleep, maximum 3 times +configure_failpoint("occasional_delay", "10%3*sleep(1000)").unwrap(); + +// 25% chance to panic +configure_failpoint("rare_panic", "25%panic(critical error)").unwrap(); +``` + +The probability system uses a counter to track how many times a probability-based action has been triggered, allowing for precise control over test scenarios. ## Dynamic Behavior diff --git a/libs/neon_failpoint/examples/context_demo.rs b/libs/neon_failpoint/examples/context_demo.rs index 28876cb1d3..38d3333560 100644 --- a/libs/neon_failpoint/examples/context_demo.rs +++ b/libs/neon_failpoint/examples/context_demo.rs @@ -26,7 +26,7 @@ async fn main() { println!("Testing with matching context..."); match failpoint("backup_operation", Some(&context)).await { FailpointResult::Return(value) => { - println!("Failpoint triggered with value: {}", value); + println!("Failpoint triggered with value: {:?}", value); } FailpointResult::Continue => { println!("Failpoint not triggered"); @@ -45,7 +45,7 @@ async fn main() { println!("Testing with non-matching context..."); match failpoint("backup_operation", Some(&context)).await { FailpointResult::Return(value) => { - println!("Failpoint triggered with value: {}", value); + println!("Failpoint triggered with value: {:?}", value); } FailpointResult::Continue => { println!("Failpoint not triggered (expected)"); diff --git a/libs/neon_failpoint/src/lib.rs b/libs/neon_failpoint/src/lib.rs index 95c412e7e7..a8a407e6ee 100644 --- a/libs/neon_failpoint/src/lib.rs +++ b/libs/neon_failpoint/src/lib.rs @@ -13,6 +13,7 @@ use std::time::Duration; use anyhow::Result; use once_cell::sync::Lazy; use parking_lot::RwLock; +use rand::Rng; use regex::Regex; use serde::{Deserialize, Serialize}; use tokio::sync::Notify; @@ -27,12 +28,25 @@ static FAILPOINTS: Lazy>>> = /// Configuration for a single failpoint #[derive(Debug, Clone)] pub struct FailpointConfig { - /// The action to take when the failpoint is hit - pub action: FailpointAction, + /// The action specification including probability + pub action_spec: FailpointActionSpec, /// Optional context matching rules pub context_matchers: Option>, /// Notify objects for tasks waiting on this failpoint pub notifiers: Vec>, + /// Counter for probability-based actions + pub trigger_count: u32, +} + +/// Specification for a failpoint action with probability +#[derive(Debug, Clone)] +pub struct FailpointActionSpec { + /// Probability as a percentage (0-100), None means always trigger (100%) + pub probability: Option, + /// Maximum number of times to trigger (None = unlimited) + pub max_count: Option, + /// The actual action to execute + pub action: FailpointAction, } /// Actions that can be taken when a failpoint is hit @@ -48,6 +62,8 @@ pub enum FailpointAction { Return(Option), /// Exit the process immediately Exit, + /// Panic the process with a message + Panic(String), } /// Context information passed to failpoints @@ -78,11 +94,12 @@ pub fn init() -> Result<()> { /// Configure a failpoint with the given action string pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> { - let action = parse_action(actions)?; + let action_spec = parse_action_spec(actions)?; let config = FailpointConfig { - action, + action_spec, context_matchers: None, notifiers: Vec::new(), + trigger_count: 0, }; let mut failpoints = FAILPOINTS.write(); @@ -107,11 +124,12 @@ pub fn configure_failpoint_with_context( actions: &str, context_matchers: HashMap, ) -> Result<()> { - let action = parse_action(actions)?; + let action_spec = parse_action_spec(actions)?; let config = FailpointConfig { - action, + action_spec, context_matchers: Some(context_matchers), notifiers: Vec::new(), + trigger_count: 0, }; let mut failpoints = FAILPOINTS.write(); @@ -183,124 +201,197 @@ pub async fn failpoint_with_cancellation( } } + // Check probability and max_count + if let Some(probability) = config.action_spec.probability { + // Check if we've hit the max count + if let Some(max_count) = config.action_spec.max_count { + if config.trigger_count >= max_count { + return FailpointResult::Continue; + } + } + + // Check probability + let mut rng = rand::thread_rng(); + let roll: u8 = rng.gen_range(1..=100); + if roll > probability { + return FailpointResult::Continue; + } + + // Increment trigger count + { + let mut failpoints = FAILPOINTS.write(); + if let Some(fp_config) = failpoints.get_mut(name) { + fp_config.trigger_count += 1; + } + } + } + tracing::info!("Hit failpoint: {}", name); - match config.action { - FailpointAction::Off => FailpointResult::Continue, - FailpointAction::Pause => { - tracing::info!("Failpoint {} pausing", name); - - // Create a notifier for this task - let notifier = Arc::new(Notify::new()); - - // Add the notifier to the failpoint configuration - { - let mut failpoints = FAILPOINTS.write(); - if let Some(fp_config) = failpoints.get_mut(name) { - fp_config.notifiers.push(notifier.clone()); - } - } - - // Create a cleanup guard to remove the notifier when this task completes - let cleanup_guard = NotifierCleanupGuard { - failpoint_name: name.to_string(), - notifier: notifier.clone(), - }; - - // Wait for either cancellation or notification - let result = tokio::select! { - _ = cancel_token.cancelled() => { - tracing::info!("Failpoint {} cancelled", name); - FailpointResult::Cancelled - } - _ = notifier.notified() => { - tracing::info!("Failpoint {} unpaused, resuming", name); - FailpointResult::Continue - } - }; - - // Cleanup happens automatically when cleanup_guard is dropped - drop(cleanup_guard); - result - } - FailpointAction::Sleep(millis) => { - let duration = Duration::from_millis(millis); - tracing::info!("Failpoint {} sleeping for {:?}", name, duration); - - // Create a notifier for this task - let notifier = Arc::new(Notify::new()); - - // Add the notifier to the failpoint configuration - { - let mut failpoints = FAILPOINTS.write(); - if let Some(fp_config) = failpoints.get_mut(name) { - fp_config.notifiers.push(notifier.clone()); - } - } - - // Create a cleanup guard to remove the notifier when this task completes - let cleanup_guard = NotifierCleanupGuard { - failpoint_name: name.to_string(), - notifier: notifier.clone(), - }; - - let result = tokio::select! { - _ = tokio::time::sleep(duration) => { - tracing::info!("Failpoint {} sleep completed", name); - FailpointResult::Continue - } - _ = cancel_token.cancelled() => { - tracing::info!("Failpoint {} sleep cancelled", name); - FailpointResult::Cancelled - } - _ = notifier.notified() => { - tracing::info!("Failpoint {} sleep interrupted, resuming", name); - FailpointResult::Continue - } - }; - - // Cleanup happens automatically when cleanup_guard is dropped - drop(cleanup_guard); - result - } - FailpointAction::Return(value) => { - tracing::info!("Failpoint {} returning: {:?}", name, value); - FailpointResult::Return(value) - } - FailpointAction::Exit => { - tracing::info!("Failpoint {} exiting process", name); - std::process::exit(1); - } - } + execute_action(name, &config.action_spec, context, cancel_token).await } -/// Parse an action string into a FailpointAction -fn parse_action(actions: &str) -> Result { - match actions { - "off" => Ok(FailpointAction::Off), - "pause" => Ok(FailpointAction::Pause), - "exit" => Ok(FailpointAction::Exit), - "return" => Ok(FailpointAction::Return(None)), - _ => { - // Try to parse return(value) format - if let Some(captures) = regex::Regex::new(r"^return\(([^)]*)\)$")?.captures(actions) { - let value = captures.get(1).unwrap().as_str().to_string(); - Ok(FailpointAction::Return(Some(value))) +/// Execute a specific action (used for recursive execution in probability-based actions) +fn execute_action<'a>( + name: &'a str, + action_spec: &'a FailpointActionSpec, + _context: Option<&'a FailpointContext>, + cancel_token: &'a CancellationToken, +) -> std::pin::Pin + Send + 'a>> { + Box::pin(async move { + match &action_spec.action { + FailpointAction::Off => FailpointResult::Continue, + FailpointAction::Return(value) => { + tracing::info!("Failpoint {} returning: {:?}", name, value); + FailpointResult::Return(value.clone()) } - // Try to parse sleep(millis) format - else if let Some(captures) = regex::Regex::new(r"^sleep\((\d+)\)$")?.captures(actions) { - let millis = captures.get(1).unwrap().as_str().parse::()?; - Ok(FailpointAction::Sleep(millis)) + FailpointAction::Sleep(millis) => { + tracing::info!("Failpoint {} sleeping for {}ms", name, millis); + + // Create a notifier for this task + let notifier = Arc::new(Notify::new()); + + // Add the notifier to the failpoint configuration + { + let mut failpoints = FAILPOINTS.write(); + if let Some(fp_config) = failpoints.get_mut(name) { + fp_config.notifiers.push(notifier.clone()); + } + } + + // Create cleanup guard to remove notifier when done + let _guard = NotifierCleanupGuard { + failpoint_name: name.to_string(), + notifier: notifier.clone(), + }; + + // Sleep with cancellation support + tokio::select! { + _ = tokio::time::sleep(Duration::from_millis(*millis)) => { + tracing::info!("Failpoint {} finished sleeping", name); + FailpointResult::Continue + } + _ = cancel_token.cancelled() => { + tracing::info!("Failpoint {} sleep cancelled", name); + FailpointResult::Cancelled + } + _ = notifier.notified() => { + tracing::info!("Failpoint {} sleep interrupted by configuration change", name); + FailpointResult::Continue + } + } } - // For backward compatibility, treat numeric values as sleep duration - else if let Ok(millis) = actions.parse::() { - Ok(FailpointAction::Sleep(millis)) + FailpointAction::Pause => { + tracing::info!("Failpoint {} pausing", name); + + // Create a notifier for this task + let notifier = Arc::new(Notify::new()); + + // Add the notifier to the failpoint configuration + { + let mut failpoints = FAILPOINTS.write(); + if let Some(fp_config) = failpoints.get_mut(name) { + fp_config.notifiers.push(notifier.clone()); + } + } + + // Create cleanup guard to remove notifier when done + let _guard = NotifierCleanupGuard { + failpoint_name: name.to_string(), + notifier: notifier.clone(), + }; + + // Wait until cancelled or notified + tokio::select! { + _ = cancel_token.cancelled() => { + tracing::info!("Failpoint {} pause cancelled", name); + FailpointResult::Cancelled + } + _ = notifier.notified() => { + tracing::info!("Failpoint {} pause ended due to configuration change", name); + FailpointResult::Continue + } + } } - else { - anyhow::bail!("Invalid failpoint action: {}", actions); + FailpointAction::Exit => { + tracing::info!("Failpoint {} exiting process", name); + std::process::exit(1); + } + FailpointAction::Panic(message) => { + tracing::error!("Failpoint {} panicking with message: {}", name, message); + panic!("Failpoint panicked: {}", message); } } + }) +} + +/// Parse an action string into a FailpointActionSpec +fn parse_action_spec(actions: &str) -> Result { + // Regex patterns for different probability formats + let prob_count_action = regex::Regex::new(r"^(\d+)%(\d+)\*(.+)$")?; + let prob_action = regex::Regex::new(r"^(\d+)%(.+)$")?; + let return_match = regex::Regex::new(r"^return\(([^)]*)\)$")?; + let sleep_match = regex::Regex::new(r"^sleep\((\d+)\)$")?; + let panic_match = regex::Regex::new(r"^panic\(([^)]*)\)$")?; + + let mut probability: Option = None; + let mut max_count: Option = None; + let mut action_str = actions; + + // Check for probability with count: "10%3*return(3000)" + if let Some(captures) = prob_count_action.captures(actions) { + probability = Some(captures.get(1).unwrap().as_str().parse::()?); + max_count = Some(captures.get(2).unwrap().as_str().parse::()?); + action_str = captures.get(3).unwrap().as_str(); } + // Check for probability without count: "50%return(15)" + else if let Some(captures) = prob_action.captures(actions) { + probability = Some(captures.get(1).unwrap().as_str().parse::()?); + action_str = captures.get(2).unwrap().as_str(); + } + + // Parse the action part + let action = if action_str == "off" { + FailpointAction::Off + } else if action_str == "pause" { + FailpointAction::Pause + } else if action_str == "return" { + FailpointAction::Return(None) + } else if action_str == "exit" { + FailpointAction::Exit + } else if let Some(captures) = return_match.captures(action_str) { + let value = captures.get(1).unwrap().as_str(); + if value.is_empty() { + FailpointAction::Return(None) + } else { + FailpointAction::Return(Some(value.to_string())) + } + } else if let Some(captures) = sleep_match.captures(action_str) { + let millis = captures.get(1).unwrap().as_str().parse::()?; + FailpointAction::Sleep(millis) + } else if let Some(captures) = panic_match.captures(action_str) { + let message = captures.get(1).unwrap().as_str().to_string(); + FailpointAction::Panic(message) + } else { + // For backward compatibility, treat numeric values as sleep duration + if let Ok(millis) = action_str.parse::() { + FailpointAction::Sleep(millis) + } else { + anyhow::bail!("Invalid failpoint action: {}", action_str); + } + }; + + if let Some(p) = probability { + if p > 100 { + anyhow::bail!("Probability must be between 0 and 100, got {}", p); + } + } + + Ok(FailpointActionSpec { + probability, + max_count, + action, + }) } /// Check if the given context matches the matchers @@ -357,7 +448,7 @@ mod tests { async fn test_failpoint_return() { configure_failpoint("test_return", "return(42)").unwrap(); let result = failpoint("test_return", None).await; - if let FailpointResult::Return(value) = result { + if let FailpointResult::Return(Some(value)) = result { assert_eq!(value, "42"); } else { panic!("Expected return result"); @@ -597,24 +688,209 @@ mod tests { #[tokio::test] async fn test_context_matching() { - let mut context_matchers = HashMap::new(); - context_matchers.insert("tenant_id".to_string(), "test_.*".to_string()); - - configure_failpoint_with_context("test_context", "return(matched)", context_matchers).unwrap(); - let mut context = HashMap::new(); - context.insert("tenant_id".to_string(), "test_123".to_string()); + context.insert("user_id".to_string(), "123".to_string()); + context.insert("operation".to_string(), "read".to_string()); + + let mut matchers = HashMap::new(); + matchers.insert("user_id".to_string(), "123".to_string()); + matchers.insert("operation".to_string(), "read".to_string()); + + configure_failpoint_with_context("test_context", "return(matched)", matchers).unwrap(); let result = failpoint("test_context", Some(&context)).await; - if let FailpointResult::Return(value) = result { + if let FailpointResult::Return(Some(value)) = result { assert_eq!(value, "matched"); } else { panic!("Expected return result"); } - // Test non-matching context - context.insert("tenant_id".to_string(), "other_123".to_string()); - let result = failpoint("test_context", Some(&context)).await; + // Test with non-matching context + let mut wrong_context = HashMap::new(); + wrong_context.insert("user_id".to_string(), "456".to_string()); + wrong_context.insert("operation".to_string(), "write".to_string()); + + let result = failpoint("test_context", Some(&wrong_context)).await; matches!(result, FailpointResult::Continue); } + + #[tokio::test] + async fn test_failpoint_panic() { + configure_failpoint("test_panic", "panic(test message)").unwrap(); + + let result = tokio::task::spawn_blocking(|| { + std::panic::catch_unwind(|| { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + failpoint("test_panic", None).await + }) + }) + }).await.unwrap(); + + assert!(result.is_err()); + let panic_msg = result.unwrap_err(); + if let Some(msg) = panic_msg.downcast_ref::() { + assert!(msg.contains("test message")); + } else if let Some(msg) = panic_msg.downcast_ref::<&str>() { + assert!(msg.contains("test message")); + } else { + panic!("Expected panic with string message, got: {:?}", panic_msg); + } + } + + #[tokio::test] + async fn test_failpoint_probability_simple() { + configure_failpoint("test_prob_simple", "100%return(always)").unwrap(); + + // 100% probability should always trigger + let result = failpoint("test_prob_simple", None).await; + if let FailpointResult::Return(Some(value)) = result { + assert_eq!(value, "always"); + } else { + panic!("Expected return result with 100% probability"); + } + + // 0% probability should never trigger + configure_failpoint("test_prob_never", "0%return(never)").unwrap(); + let result = failpoint("test_prob_never", None).await; + matches!(result, FailpointResult::Continue); + } + + #[tokio::test] + async fn test_failpoint_probability_with_count() { + configure_failpoint("test_prob_count", "100%2*return(limited)").unwrap(); + + // First two calls should trigger (100% probability, max 2 times) + let result1 = failpoint("test_prob_count", None).await; + if let FailpointResult::Return(Some(value)) = result1 { + assert_eq!(value, "limited"); + } else { + panic!("Expected return result on first call"); + } + + let result2 = failpoint("test_prob_count", None).await; + if let FailpointResult::Return(Some(value)) = result2 { + assert_eq!(value, "limited"); + } else { + panic!("Expected return result on second call"); + } + + // Third call should not trigger (count limit exceeded) + let result3 = failpoint("test_prob_count", None).await; + matches!(result3, FailpointResult::Continue); + } + + #[tokio::test] + async fn test_failpoint_probability_nested_actions() { + configure_failpoint("test_prob_sleep", "100%sleep(10)").unwrap(); + + let start = std::time::Instant::now(); + let result = failpoint("test_prob_sleep", None).await; + let duration = start.elapsed(); + + matches!(result, FailpointResult::Continue); + assert!(duration >= Duration::from_millis(10)); + } + + #[tokio::test] + async fn test_parse_action_spec_panic() { + let action_spec = parse_action_spec("panic(test error)").unwrap(); + matches!(action_spec.action, FailpointAction::Panic(msg) if msg == "test error"); + } + + #[tokio::test] + async fn test_parse_action_spec_probability() { + // Test simple probability + let action_spec = parse_action_spec("50%return(value)").unwrap(); + if let FailpointAction::Return(Some(ref v)) = action_spec.action { + assert_eq!(v, "value"); + } else { + panic!("Expected return action"); + } + + // Test probability with count + let action_spec = parse_action_spec("25%3*return(limited)").unwrap(); + if let FailpointAction::Return(Some(ref v)) = action_spec.action { + assert_eq!(v, "limited"); + } else { + panic!("Expected return action"); + } + + // Test probability with other actions + let action_spec = parse_action_spec("75%sleep(100)").unwrap(); + if let FailpointAction::Sleep(100) = action_spec.action { + // Test passed + } else { + panic!("Expected sleep action"); + } + } + + #[tokio::test] + async fn test_parse_action_spec_invalid_probability() { + // Test probability > 100 + let result = parse_action_spec("150%return(invalid)"); + assert!(result.is_err()); + + // Test invalid format + let result = parse_action_spec("50%invalid_action"); + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_audit_specific_patterns() { + // Test patterns found in the audit + + // 1. panic(failpoint) - from test_sharding.py + let action_spec = parse_action_spec("panic(failpoint)").unwrap(); + matches!(action_spec.action, FailpointAction::Panic(msg) if msg == "failpoint"); + + // 2. 50%return(15) - from test_bad_connection.py + let action_spec = parse_action_spec("50%return(15)").unwrap(); + if let FailpointAction::Return(Some(ref v)) = action_spec.action { + assert_eq!(v, "15"); + } else { + panic!("Expected return action"); + } + + // 3. 10%3*return(3000) - from test_bad_connection.py + let action_spec = parse_action_spec("10%3*return(3000)").unwrap(); + if let FailpointAction::Return(Some(ref v)) = action_spec.action { + assert_eq!(v, "3000"); + } else { + panic!("Expected return action"); + } + + // 4. 10%return(60000) - from test_bad_connection.py + let action_spec = parse_action_spec("10%return(60000)").unwrap(); + if let FailpointAction::Return(Some(ref v)) = action_spec.action { + assert_eq!(v, "60000"); + } else { + panic!("Expected return action"); + } + + // Test that these patterns work in practice + configure_failpoint("test_audit_panic", "panic(storage controller crash)").unwrap(); + configure_failpoint("test_audit_prob", "100%return(42)").unwrap(); + configure_failpoint("test_audit_count", "100%1*return(limited)").unwrap(); + + // Test the probability-based return + let result = failpoint("test_audit_prob", None).await; + if let FailpointResult::Return(Some(value)) = result { + assert_eq!(value, "42"); + } else { + panic!("Expected return result"); + } + + // Test the count-limited probability + let result1 = failpoint("test_audit_count", None).await; + if let FailpointResult::Return(Some(value)) = result1 { + assert_eq!(value, "limited"); + } else { + panic!("Expected return result on first call"); + } + + // Second call should not trigger (count limit exceeded) + let result2 = failpoint("test_audit_count", None).await; + matches!(result2, FailpointResult::Continue); + } } \ No newline at end of file