audit for failpoint usage, implement missing ops

This commit is contained in:
Christian Schwarz
2025-07-11 14:00:23 +02:00
parent 27dc11f5cc
commit 415cdff336
5 changed files with 427 additions and 127 deletions

1
Cargo.lock generated
View File

@@ -3859,6 +3859,7 @@ dependencies = [
"anyhow",
"once_cell",
"parking_lot 0.12.1",
"rand 0.8.5",
"regex",
"serde",
"tokio",

View File

@@ -12,6 +12,7 @@ anyhow = { workspace = true }
regex = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
[dev-dependencies]
tracing-subscriber = { workspace = true, features = ["fmt"] }

View File

@@ -19,6 +19,28 @@ A modern, async-first failpoint library for Neon, replacing the `fail` crate wit
- `return` - Return early (empty value)
- `return(value)` - Return early with a specific value
- `exit` - Exit the process immediately
- `panic(message)` - Panic the process with a custom message
- `N%return(value)` - Return with a specific value N% of the time (probability-based)
- `N%M*return(value)` - Return with a specific value N% of the time, maximum M times
- `N%action` - Execute any action N% of the time (probability-based)
- `N%M*action` - Execute any action N% of the time, maximum M times
## Probability-Based Actions
The library supports probability-based failpoints that trigger only a percentage of the time:
```rust
// 50% chance to return a value
configure_failpoint("random_failure", "50%return(error)").unwrap();
// 10% chance to sleep, maximum 3 times
configure_failpoint("occasional_delay", "10%3*sleep(1000)").unwrap();
// 25% chance to panic
configure_failpoint("rare_panic", "25%panic(critical error)").unwrap();
```
The probability system uses a counter to track how many times a probability-based action has been triggered, allowing for precise control over test scenarios.
## Dynamic Behavior

View File

@@ -26,7 +26,7 @@ async fn main() {
println!("Testing with matching context...");
match failpoint("backup_operation", Some(&context)).await {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {}", value);
println!("Failpoint triggered with value: {:?}", value);
}
FailpointResult::Continue => {
println!("Failpoint not triggered");
@@ -45,7 +45,7 @@ async fn main() {
println!("Testing with non-matching context...");
match failpoint("backup_operation", Some(&context)).await {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {}", value);
println!("Failpoint triggered with value: {:?}", value);
}
FailpointResult::Continue => {
println!("Failpoint not triggered (expected)");

View File

@@ -13,6 +13,7 @@ use std::time::Duration;
use anyhow::Result;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use rand::Rng;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
@@ -27,12 +28,25 @@ static FAILPOINTS: Lazy<Arc<RwLock<HashMap<String, FailpointConfig>>>> =
/// Configuration for a single failpoint
#[derive(Debug, Clone)]
pub struct FailpointConfig {
/// The action to take when the failpoint is hit
pub action: FailpointAction,
/// The action specification including probability
pub action_spec: FailpointActionSpec,
/// Optional context matching rules
pub context_matchers: Option<HashMap<String, String>>,
/// Notify objects for tasks waiting on this failpoint
pub notifiers: Vec<Arc<Notify>>,
/// Counter for probability-based actions
pub trigger_count: u32,
}
/// Specification for a failpoint action with probability
#[derive(Debug, Clone)]
pub struct FailpointActionSpec {
/// Probability as a percentage (0-100), None means always trigger (100%)
pub probability: Option<u8>,
/// Maximum number of times to trigger (None = unlimited)
pub max_count: Option<u32>,
/// The actual action to execute
pub action: FailpointAction,
}
/// Actions that can be taken when a failpoint is hit
@@ -48,6 +62,8 @@ pub enum FailpointAction {
Return(Option<String>),
/// Exit the process immediately
Exit,
/// Panic the process with a message
Panic(String),
}
/// Context information passed to failpoints
@@ -78,11 +94,12 @@ pub fn init() -> Result<()> {
/// Configure a failpoint with the given action string
pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> {
let action = parse_action(actions)?;
let action_spec = parse_action_spec(actions)?;
let config = FailpointConfig {
action,
action_spec,
context_matchers: None,
notifiers: Vec::new(),
trigger_count: 0,
};
let mut failpoints = FAILPOINTS.write();
@@ -107,11 +124,12 @@ pub fn configure_failpoint_with_context(
actions: &str,
context_matchers: HashMap<String, String>,
) -> Result<()> {
let action = parse_action(actions)?;
let action_spec = parse_action_spec(actions)?;
let config = FailpointConfig {
action,
action_spec,
context_matchers: Some(context_matchers),
notifiers: Vec::new(),
trigger_count: 0,
};
let mut failpoints = FAILPOINTS.write();
@@ -183,124 +201,197 @@ pub async fn failpoint_with_cancellation(
}
}
// Check probability and max_count
if let Some(probability) = config.action_spec.probability {
// Check if we've hit the max count
if let Some(max_count) = config.action_spec.max_count {
if config.trigger_count >= max_count {
return FailpointResult::Continue;
}
}
// Check probability
let mut rng = rand::thread_rng();
let roll: u8 = rng.gen_range(1..=100);
if roll > probability {
return FailpointResult::Continue;
}
// Increment trigger count
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
fp_config.trigger_count += 1;
}
}
}
tracing::info!("Hit failpoint: {}", name);
match config.action {
FailpointAction::Off => FailpointResult::Continue,
FailpointAction::Pause => {
tracing::info!("Failpoint {} pausing", name);
// Create a notifier for this task
let notifier = Arc::new(Notify::new());
// Add the notifier to the failpoint configuration
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
fp_config.notifiers.push(notifier.clone());
}
}
// Create a cleanup guard to remove the notifier when this task completes
let cleanup_guard = NotifierCleanupGuard {
failpoint_name: name.to_string(),
notifier: notifier.clone(),
};
// Wait for either cancellation or notification
let result = tokio::select! {
_ = cancel_token.cancelled() => {
tracing::info!("Failpoint {} cancelled", name);
FailpointResult::Cancelled
}
_ = notifier.notified() => {
tracing::info!("Failpoint {} unpaused, resuming", name);
FailpointResult::Continue
}
};
// Cleanup happens automatically when cleanup_guard is dropped
drop(cleanup_guard);
result
}
FailpointAction::Sleep(millis) => {
let duration = Duration::from_millis(millis);
tracing::info!("Failpoint {} sleeping for {:?}", name, duration);
// Create a notifier for this task
let notifier = Arc::new(Notify::new());
// Add the notifier to the failpoint configuration
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
fp_config.notifiers.push(notifier.clone());
}
}
// Create a cleanup guard to remove the notifier when this task completes
let cleanup_guard = NotifierCleanupGuard {
failpoint_name: name.to_string(),
notifier: notifier.clone(),
};
let result = tokio::select! {
_ = tokio::time::sleep(duration) => {
tracing::info!("Failpoint {} sleep completed", name);
FailpointResult::Continue
}
_ = cancel_token.cancelled() => {
tracing::info!("Failpoint {} sleep cancelled", name);
FailpointResult::Cancelled
}
_ = notifier.notified() => {
tracing::info!("Failpoint {} sleep interrupted, resuming", name);
FailpointResult::Continue
}
};
// Cleanup happens automatically when cleanup_guard is dropped
drop(cleanup_guard);
result
}
FailpointAction::Return(value) => {
tracing::info!("Failpoint {} returning: {:?}", name, value);
FailpointResult::Return(value)
}
FailpointAction::Exit => {
tracing::info!("Failpoint {} exiting process", name);
std::process::exit(1);
}
}
execute_action(name, &config.action_spec, context, cancel_token).await
}
/// Parse an action string into a FailpointAction
fn parse_action(actions: &str) -> Result<FailpointAction> {
match actions {
"off" => Ok(FailpointAction::Off),
"pause" => Ok(FailpointAction::Pause),
"exit" => Ok(FailpointAction::Exit),
"return" => Ok(FailpointAction::Return(None)),
_ => {
// Try to parse return(value) format
if let Some(captures) = regex::Regex::new(r"^return\(([^)]*)\)$")?.captures(actions) {
let value = captures.get(1).unwrap().as_str().to_string();
Ok(FailpointAction::Return(Some(value)))
/// Execute a specific action (used for recursive execution in probability-based actions)
fn execute_action<'a>(
name: &'a str,
action_spec: &'a FailpointActionSpec,
_context: Option<&'a FailpointContext>,
cancel_token: &'a CancellationToken,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = FailpointResult> + Send + 'a>> {
Box::pin(async move {
match &action_spec.action {
FailpointAction::Off => FailpointResult::Continue,
FailpointAction::Return(value) => {
tracing::info!("Failpoint {} returning: {:?}", name, value);
FailpointResult::Return(value.clone())
}
// Try to parse sleep(millis) format
else if let Some(captures) = regex::Regex::new(r"^sleep\((\d+)\)$")?.captures(actions) {
let millis = captures.get(1).unwrap().as_str().parse::<u64>()?;
Ok(FailpointAction::Sleep(millis))
FailpointAction::Sleep(millis) => {
tracing::info!("Failpoint {} sleeping for {}ms", name, millis);
// Create a notifier for this task
let notifier = Arc::new(Notify::new());
// Add the notifier to the failpoint configuration
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
fp_config.notifiers.push(notifier.clone());
}
}
// Create cleanup guard to remove notifier when done
let _guard = NotifierCleanupGuard {
failpoint_name: name.to_string(),
notifier: notifier.clone(),
};
// Sleep with cancellation support
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(*millis)) => {
tracing::info!("Failpoint {} finished sleeping", name);
FailpointResult::Continue
}
_ = cancel_token.cancelled() => {
tracing::info!("Failpoint {} sleep cancelled", name);
FailpointResult::Cancelled
}
_ = notifier.notified() => {
tracing::info!("Failpoint {} sleep interrupted by configuration change", name);
FailpointResult::Continue
}
}
}
// For backward compatibility, treat numeric values as sleep duration
else if let Ok(millis) = actions.parse::<u64>() {
Ok(FailpointAction::Sleep(millis))
FailpointAction::Pause => {
tracing::info!("Failpoint {} pausing", name);
// Create a notifier for this task
let notifier = Arc::new(Notify::new());
// Add the notifier to the failpoint configuration
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
fp_config.notifiers.push(notifier.clone());
}
}
// Create cleanup guard to remove notifier when done
let _guard = NotifierCleanupGuard {
failpoint_name: name.to_string(),
notifier: notifier.clone(),
};
// Wait until cancelled or notified
tokio::select! {
_ = cancel_token.cancelled() => {
tracing::info!("Failpoint {} pause cancelled", name);
FailpointResult::Cancelled
}
_ = notifier.notified() => {
tracing::info!("Failpoint {} pause ended due to configuration change", name);
FailpointResult::Continue
}
}
}
else {
anyhow::bail!("Invalid failpoint action: {}", actions);
FailpointAction::Exit => {
tracing::info!("Failpoint {} exiting process", name);
std::process::exit(1);
}
FailpointAction::Panic(message) => {
tracing::error!("Failpoint {} panicking with message: {}", name, message);
panic!("Failpoint panicked: {}", message);
}
}
})
}
/// Parse an action string into a FailpointActionSpec
fn parse_action_spec(actions: &str) -> Result<FailpointActionSpec> {
// Regex patterns for different probability formats
let prob_count_action = regex::Regex::new(r"^(\d+)%(\d+)\*(.+)$")?;
let prob_action = regex::Regex::new(r"^(\d+)%(.+)$")?;
let return_match = regex::Regex::new(r"^return\(([^)]*)\)$")?;
let sleep_match = regex::Regex::new(r"^sleep\((\d+)\)$")?;
let panic_match = regex::Regex::new(r"^panic\(([^)]*)\)$")?;
let mut probability: Option<u8> = None;
let mut max_count: Option<u32> = None;
let mut action_str = actions;
// Check for probability with count: "10%3*return(3000)"
if let Some(captures) = prob_count_action.captures(actions) {
probability = Some(captures.get(1).unwrap().as_str().parse::<u8>()?);
max_count = Some(captures.get(2).unwrap().as_str().parse::<u32>()?);
action_str = captures.get(3).unwrap().as_str();
}
// Check for probability without count: "50%return(15)"
else if let Some(captures) = prob_action.captures(actions) {
probability = Some(captures.get(1).unwrap().as_str().parse::<u8>()?);
action_str = captures.get(2).unwrap().as_str();
}
// Parse the action part
let action = if action_str == "off" {
FailpointAction::Off
} else if action_str == "pause" {
FailpointAction::Pause
} else if action_str == "return" {
FailpointAction::Return(None)
} else if action_str == "exit" {
FailpointAction::Exit
} else if let Some(captures) = return_match.captures(action_str) {
let value = captures.get(1).unwrap().as_str();
if value.is_empty() {
FailpointAction::Return(None)
} else {
FailpointAction::Return(Some(value.to_string()))
}
} else if let Some(captures) = sleep_match.captures(action_str) {
let millis = captures.get(1).unwrap().as_str().parse::<u64>()?;
FailpointAction::Sleep(millis)
} else if let Some(captures) = panic_match.captures(action_str) {
let message = captures.get(1).unwrap().as_str().to_string();
FailpointAction::Panic(message)
} else {
// For backward compatibility, treat numeric values as sleep duration
if let Ok(millis) = action_str.parse::<u64>() {
FailpointAction::Sleep(millis)
} else {
anyhow::bail!("Invalid failpoint action: {}", action_str);
}
};
if let Some(p) = probability {
if p > 100 {
anyhow::bail!("Probability must be between 0 and 100, got {}", p);
}
}
Ok(FailpointActionSpec {
probability,
max_count,
action,
})
}
/// Check if the given context matches the matchers
@@ -357,7 +448,7 @@ mod tests {
async fn test_failpoint_return() {
configure_failpoint("test_return", "return(42)").unwrap();
let result = failpoint("test_return", None).await;
if let FailpointResult::Return(value) = result {
if let FailpointResult::Return(Some(value)) = result {
assert_eq!(value, "42");
} else {
panic!("Expected return result");
@@ -597,24 +688,209 @@ mod tests {
#[tokio::test]
async fn test_context_matching() {
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
configure_failpoint_with_context("test_context", "return(matched)", context_matchers).unwrap();
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("user_id".to_string(), "123".to_string());
context.insert("operation".to_string(), "read".to_string());
let mut matchers = HashMap::new();
matchers.insert("user_id".to_string(), "123".to_string());
matchers.insert("operation".to_string(), "read".to_string());
configure_failpoint_with_context("test_context", "return(matched)", matchers).unwrap();
let result = failpoint("test_context", Some(&context)).await;
if let FailpointResult::Return(value) = result {
if let FailpointResult::Return(Some(value)) = result {
assert_eq!(value, "matched");
} else {
panic!("Expected return result");
}
// Test non-matching context
context.insert("tenant_id".to_string(), "other_123".to_string());
let result = failpoint("test_context", Some(&context)).await;
// Test with non-matching context
let mut wrong_context = HashMap::new();
wrong_context.insert("user_id".to_string(), "456".to_string());
wrong_context.insert("operation".to_string(), "write".to_string());
let result = failpoint("test_context", Some(&wrong_context)).await;
matches!(result, FailpointResult::Continue);
}
#[tokio::test]
async fn test_failpoint_panic() {
configure_failpoint("test_panic", "panic(test message)").unwrap();
let result = tokio::task::spawn_blocking(|| {
std::panic::catch_unwind(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
failpoint("test_panic", None).await
})
})
}).await.unwrap();
assert!(result.is_err());
let panic_msg = result.unwrap_err();
if let Some(msg) = panic_msg.downcast_ref::<String>() {
assert!(msg.contains("test message"));
} else if let Some(msg) = panic_msg.downcast_ref::<&str>() {
assert!(msg.contains("test message"));
} else {
panic!("Expected panic with string message, got: {:?}", panic_msg);
}
}
#[tokio::test]
async fn test_failpoint_probability_simple() {
configure_failpoint("test_prob_simple", "100%return(always)").unwrap();
// 100% probability should always trigger
let result = failpoint("test_prob_simple", None).await;
if let FailpointResult::Return(Some(value)) = result {
assert_eq!(value, "always");
} else {
panic!("Expected return result with 100% probability");
}
// 0% probability should never trigger
configure_failpoint("test_prob_never", "0%return(never)").unwrap();
let result = failpoint("test_prob_never", None).await;
matches!(result, FailpointResult::Continue);
}
#[tokio::test]
async fn test_failpoint_probability_with_count() {
configure_failpoint("test_prob_count", "100%2*return(limited)").unwrap();
// First two calls should trigger (100% probability, max 2 times)
let result1 = failpoint("test_prob_count", None).await;
if let FailpointResult::Return(Some(value)) = result1 {
assert_eq!(value, "limited");
} else {
panic!("Expected return result on first call");
}
let result2 = failpoint("test_prob_count", None).await;
if let FailpointResult::Return(Some(value)) = result2 {
assert_eq!(value, "limited");
} else {
panic!("Expected return result on second call");
}
// Third call should not trigger (count limit exceeded)
let result3 = failpoint("test_prob_count", None).await;
matches!(result3, FailpointResult::Continue);
}
#[tokio::test]
async fn test_failpoint_probability_nested_actions() {
configure_failpoint("test_prob_sleep", "100%sleep(10)").unwrap();
let start = std::time::Instant::now();
let result = failpoint("test_prob_sleep", None).await;
let duration = start.elapsed();
matches!(result, FailpointResult::Continue);
assert!(duration >= Duration::from_millis(10));
}
#[tokio::test]
async fn test_parse_action_spec_panic() {
let action_spec = parse_action_spec("panic(test error)").unwrap();
matches!(action_spec.action, FailpointAction::Panic(msg) if msg == "test error");
}
#[tokio::test]
async fn test_parse_action_spec_probability() {
// Test simple probability
let action_spec = parse_action_spec("50%return(value)").unwrap();
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
assert_eq!(v, "value");
} else {
panic!("Expected return action");
}
// Test probability with count
let action_spec = parse_action_spec("25%3*return(limited)").unwrap();
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
assert_eq!(v, "limited");
} else {
panic!("Expected return action");
}
// Test probability with other actions
let action_spec = parse_action_spec("75%sleep(100)").unwrap();
if let FailpointAction::Sleep(100) = action_spec.action {
// Test passed
} else {
panic!("Expected sleep action");
}
}
#[tokio::test]
async fn test_parse_action_spec_invalid_probability() {
// Test probability > 100
let result = parse_action_spec("150%return(invalid)");
assert!(result.is_err());
// Test invalid format
let result = parse_action_spec("50%invalid_action");
assert!(result.is_err());
}
#[tokio::test]
async fn test_audit_specific_patterns() {
// Test patterns found in the audit
// 1. panic(failpoint) - from test_sharding.py
let action_spec = parse_action_spec("panic(failpoint)").unwrap();
matches!(action_spec.action, FailpointAction::Panic(msg) if msg == "failpoint");
// 2. 50%return(15) - from test_bad_connection.py
let action_spec = parse_action_spec("50%return(15)").unwrap();
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
assert_eq!(v, "15");
} else {
panic!("Expected return action");
}
// 3. 10%3*return(3000) - from test_bad_connection.py
let action_spec = parse_action_spec("10%3*return(3000)").unwrap();
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
assert_eq!(v, "3000");
} else {
panic!("Expected return action");
}
// 4. 10%return(60000) - from test_bad_connection.py
let action_spec = parse_action_spec("10%return(60000)").unwrap();
if let FailpointAction::Return(Some(ref v)) = action_spec.action {
assert_eq!(v, "60000");
} else {
panic!("Expected return action");
}
// Test that these patterns work in practice
configure_failpoint("test_audit_panic", "panic(storage controller crash)").unwrap();
configure_failpoint("test_audit_prob", "100%return(42)").unwrap();
configure_failpoint("test_audit_count", "100%1*return(limited)").unwrap();
// Test the probability-based return
let result = failpoint("test_audit_prob", None).await;
if let FailpointResult::Return(Some(value)) = result {
assert_eq!(value, "42");
} else {
panic!("Expected return result");
}
// Test the count-limited probability
let result1 = failpoint("test_audit_count", None).await;
if let FailpointResult::Return(Some(value)) = result1 {
assert_eq!(value, "limited");
} else {
panic!("Expected return result on first call");
}
// Second call should not trigger (count limit exceeded)
let result2 = failpoint("test_audit_count", None).await;
matches!(result2, FailpointResult::Continue);
}
}