This commit is contained in:
Christian Schwarz
2025-07-11 12:29:39 +02:00
parent f29b9737cc
commit 4adc3bdd3a
3 changed files with 358 additions and 8 deletions

View File

@@ -4,7 +4,7 @@ version = "0.1.0"
edition = "2021"
[dependencies]
tokio.workspace = true
tokio = { workspace = true, features = ["time", "sync", "rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["derive"] }

View File

@@ -8,17 +8,58 @@ A modern, async-first failpoint library for Neon, replacing the `fail` crate wit
- **Context matching**: Failpoints can be configured to trigger only when specific context conditions are met
- **Regex support**: Context values can be matched using regular expressions
- **Cancellation support**: All operations support cancellation tokens
- **Dynamic reconfiguration**: Paused and sleeping tasks automatically resume when failpoint configurations change
- **Backward compatibility**: Drop-in replacement for existing `fail` crate usage
## Supported Actions
- `off` - Disable the failpoint
- `pause` - Pause indefinitely until disabled or cancelled
- `sleep(N)` - Sleep for N milliseconds
- `pause` - Pause indefinitely until disabled, reconfigured, or cancelled
- `sleep(N)` - Sleep for N milliseconds (can be interrupted by reconfiguration)
- `return` - Return early (empty value)
- `return(value)` - Return early with a specific value
- `exit` - Exit the process immediately
## Dynamic Behavior
When a failpoint is reconfigured while tasks are waiting on it:
- **Paused tasks** will immediately resume and continue normal execution
- **Sleeping tasks** will wake up early and continue normal execution
- **Removed failpoints** will cause all waiting tasks to resume normally
The new configuration only applies to future hits of the failpoint, not to tasks that are already waiting. This allows for flexible testing scenarios where you can pause execution, inspect state, and then resume execution dynamically.
## Example: Dynamic Reconfiguration
```rust
use neon_failpoint::{configure_failpoint, failpoint, FailpointResult};
use tokio::time::Duration;
// Start a task that will hit a failpoint
let task = tokio::spawn(async {
println!("About to hit failpoint");
match failpoint("test_pause", None).await {
FailpointResult::Return(value) => println!("Returned: {}", value),
FailpointResult::Continue => println!("Continued normally"),
FailpointResult::Cancelled => println!("Cancelled"),
}
});
// Configure the failpoint to pause
configure_failpoint("test_pause", "pause").unwrap();
// Let the task hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
// Change the failpoint configuration - this will wake up the paused task
// The task will resume and continue normally (not apply the new config)
configure_failpoint("test_pause", "return(not_applied)").unwrap();
// The task will complete with Continue, not Return
let result = task.await.unwrap();
```
## Basic Usage
```rust

View File

@@ -15,6 +15,7 @@ use once_cell::sync::Lazy;
use parking_lot::RwLock;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
pub mod macros;
@@ -24,12 +25,14 @@ static FAILPOINTS: Lazy<Arc<RwLock<HashMap<String, FailpointConfig>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
/// Configuration for a single failpoint
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct FailpointConfig {
/// The action to take when the failpoint is hit
pub action: FailpointAction,
/// Optional context matching rules
pub context_matchers: Option<HashMap<String, String>>,
/// Notify objects for tasks waiting on this failpoint
pub notifiers: Vec<Arc<Notify>>,
}
/// Actions that can be taken when a failpoint is hit
@@ -79,9 +82,19 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> {
let config = FailpointConfig {
action,
context_matchers: None,
notifiers: Vec::new(),
};
let mut failpoints = FAILPOINTS.write();
// If this failpoint already exists, notify all waiting tasks
if let Some(existing_config) = failpoints.get(name) {
// Notify all waiting tasks about the configuration change
for notifier in &existing_config.notifiers {
notifier.notify_waiters();
}
}
failpoints.insert(name.to_string(), config);
tracing::info!("Configured failpoint: {} = {}", name, actions);
@@ -98,9 +111,19 @@ pub fn configure_failpoint_with_context(
let config = FailpointConfig {
action,
context_matchers: Some(context_matchers),
notifiers: Vec::new(),
};
let mut failpoints = FAILPOINTS.write();
// If this failpoint already exists, notify all waiting tasks
if let Some(existing_config) = failpoints.get(name) {
// Notify all waiting tasks about the configuration change
for notifier in &existing_config.notifiers {
notifier.notify_waiters();
}
}
failpoints.insert(name.to_string(), config);
tracing::info!("Configured failpoint with context: {} = {}", name, actions);
@@ -110,7 +133,16 @@ pub fn configure_failpoint_with_context(
/// Remove a failpoint configuration
pub fn remove_failpoint(name: &str) {
let mut failpoints = FAILPOINTS.write();
// Notify all waiting tasks before removing
if let Some(existing_config) = failpoints.get(name) {
for notifier in &existing_config.notifiers {
notifier.notify_waiters();
}
}
failpoints.remove(name);
tracing::info!("Removed failpoint: {}", name);
}
@@ -157,14 +189,62 @@ pub async fn failpoint_with_cancellation(
FailpointAction::Off => FailpointResult::Continue,
FailpointAction::Pause => {
tracing::info!("Failpoint {} pausing", name);
cancel_token.cancelled().await;
FailpointResult::Cancelled
// Create a notifier for this task
let notifier = Arc::new(Notify::new());
// Add the notifier to the failpoint configuration
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
fp_config.notifiers.push(notifier.clone());
}
}
// Create a cleanup guard to remove the notifier when this task completes
let cleanup_guard = NotifierCleanupGuard {
failpoint_name: name.to_string(),
notifier: notifier.clone(),
};
// Wait for either cancellation or notification
let result = tokio::select! {
_ = cancel_token.cancelled() => {
tracing::info!("Failpoint {} cancelled", name);
FailpointResult::Cancelled
}
_ = notifier.notified() => {
tracing::info!("Failpoint {} unpaused, resuming", name);
FailpointResult::Continue
}
};
// Cleanup happens automatically when cleanup_guard is dropped
drop(cleanup_guard);
result
}
FailpointAction::Sleep(millis) => {
let duration = Duration::from_millis(millis);
tracing::info!("Failpoint {} sleeping for {:?}", name, duration);
tokio::select! {
// Create a notifier for this task
let notifier = Arc::new(Notify::new());
// Add the notifier to the failpoint configuration
{
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(name) {
fp_config.notifiers.push(notifier.clone());
}
}
// Create a cleanup guard to remove the notifier when this task completes
let cleanup_guard = NotifierCleanupGuard {
failpoint_name: name.to_string(),
notifier: notifier.clone(),
};
let result = tokio::select! {
_ = tokio::time::sleep(duration) => {
tracing::info!("Failpoint {} sleep completed", name);
FailpointResult::Continue
@@ -173,7 +253,15 @@ pub async fn failpoint_with_cancellation(
tracing::info!("Failpoint {} sleep cancelled", name);
FailpointResult::Cancelled
}
}
_ = notifier.notified() => {
tracing::info!("Failpoint {} sleep interrupted, resuming", name);
FailpointResult::Continue
}
};
// Cleanup happens automatically when cleanup_guard is dropped
drop(cleanup_guard);
result
}
FailpointAction::Return(value) => {
tracing::info!("Failpoint {} returning: {}", name, value);
@@ -237,9 +325,26 @@ fn matches_context(matchers: &HashMap<String, String>, context: &FailpointContex
true
}
/// RAII guard that removes a notifier from a failpoint when dropped
struct NotifierCleanupGuard {
failpoint_name: String,
notifier: Arc<Notify>,
}
impl Drop for NotifierCleanupGuard {
fn drop(&mut self) {
let mut failpoints = FAILPOINTS.write();
if let Some(fp_config) = failpoints.get_mut(&self.failpoint_name) {
// Remove this specific notifier from the list
fp_config.notifiers.retain(|n| !Arc::ptr_eq(n, &self.notifier));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::time::timeout;
#[tokio::test]
async fn test_failpoint_off() {
@@ -286,6 +391,210 @@ mod tests {
matches!(result, FailpointResult::Cancelled);
}
#[tokio::test]
async fn test_failpoint_pause_and_resume() {
let failpoint_name = "test_pause_resume";
configure_failpoint(failpoint_name, "pause").unwrap();
let start = std::time::Instant::now();
// Start a task that hits the failpoint
let task = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
// Give it time to hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
// Reconfigure the failpoint to "off" to resume
configure_failpoint(failpoint_name, "off").unwrap();
// The task should complete quickly now
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
let duration = start.elapsed();
// Should have resumed and continued
matches!(result, FailpointResult::Continue);
// Should have taken at least 10ms (initial pause) but less than 1 second
assert!(duration >= Duration::from_millis(10));
assert!(duration < Duration::from_millis(1000));
}
#[tokio::test]
async fn test_failpoint_pause_and_change_to_return() {
let failpoint_name = "test_pause_to_return";
configure_failpoint(failpoint_name, "pause").unwrap();
// Start a task that hits the failpoint
let task = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
// Give it time to hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
// Reconfigure the failpoint to return a value
configure_failpoint(failpoint_name, "return(resumed)").unwrap();
// The task should complete by resuming (Continue), not with the new return value
// The new configuration only applies to future hits of the failpoint
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
matches!(result, FailpointResult::Continue);
}
#[tokio::test]
async fn test_failpoint_sleep_and_resume() {
let failpoint_name = "test_sleep_resume";
configure_failpoint(failpoint_name, "sleep(1000)").unwrap(); // 1 second sleep
let start = std::time::Instant::now();
// Start a task that hits the failpoint
let task = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
// Give it time to hit the failpoint and start sleeping
tokio::time::sleep(Duration::from_millis(10)).await;
// Reconfigure the failpoint to "off" to resume
configure_failpoint(failpoint_name, "off").unwrap();
// The task should complete quickly now
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
let duration = start.elapsed();
// Should have resumed and continued
matches!(result, FailpointResult::Continue);
// Should have taken much less than the original 1 second sleep
assert!(duration < Duration::from_millis(500));
}
#[tokio::test]
async fn test_failpoint_sleep_and_change_duration() {
let failpoint_name = "test_sleep_change";
configure_failpoint(failpoint_name, "sleep(1000)").unwrap(); // 1 second sleep
let start = std::time::Instant::now();
// Start a task that hits the failpoint
let task = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
// Give it time to hit the failpoint and start sleeping
tokio::time::sleep(Duration::from_millis(10)).await;
// Change the sleep duration to a shorter time
configure_failpoint(failpoint_name, "sleep(50)").unwrap();
// The task should complete by resuming (Continue), not by sleeping the new duration
let result = timeout(Duration::from_millis(200), task).await.unwrap().unwrap();
let duration = start.elapsed();
// Should have resumed and continued
matches!(result, FailpointResult::Continue);
// Should have taken much less than the original 1 second sleep
assert!(duration < Duration::from_millis(500));
}
#[tokio::test]
async fn test_failpoint_remove_during_pause() {
let failpoint_name = "test_remove_pause";
configure_failpoint(failpoint_name, "pause").unwrap();
// Start a task that hits the failpoint
let task = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
// Give it time to hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
// Remove the failpoint entirely
remove_failpoint(failpoint_name);
// The task should complete quickly now
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
// Should have resumed and continued
matches!(result, FailpointResult::Continue);
}
#[tokio::test]
async fn test_multiple_paused_tasks_resume() {
let failpoint_name = "test_multiple_pause";
configure_failpoint(failpoint_name, "pause").unwrap();
// Start multiple tasks that hit the same failpoint
let task1 = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
let task2 = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
let task3 = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
// Give them time to hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
// Reconfigure the failpoint to "off" to resume all tasks
configure_failpoint(failpoint_name, "off").unwrap();
// All tasks should complete quickly now
let result1 = timeout(Duration::from_millis(100), task1).await.unwrap().unwrap();
let result2 = timeout(Duration::from_millis(100), task2).await.unwrap().unwrap();
let result3 = timeout(Duration::from_millis(100), task3).await.unwrap().unwrap();
// All should have resumed and continued
matches!(result1, FailpointResult::Continue);
matches!(result2, FailpointResult::Continue);
matches!(result3, FailpointResult::Continue);
}
#[tokio::test]
async fn test_no_race_condition_on_rapid_config_changes() {
let failpoint_name = "test_race_condition";
// Start with pause
configure_failpoint(failpoint_name, "pause").unwrap();
// Start a task that will hit the failpoint
let task = tokio::spawn(async move {
failpoint(failpoint_name, None).await
});
// Rapidly change the configuration multiple times
for i in 0..10 {
tokio::time::sleep(Duration::from_millis(1)).await;
if i % 2 == 0 {
configure_failpoint(failpoint_name, "pause").unwrap();
} else {
configure_failpoint(failpoint_name, "return(rapid_change)").unwrap();
}
}
// Finally set it to return
configure_failpoint(failpoint_name, "return(final)").unwrap();
// The task should complete by resuming (Continue), not with any return value
let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap();
// Should resume normally since it was woken up by a configuration change
matches!(result, FailpointResult::Continue);
}
#[tokio::test]
async fn test_context_matching() {
let mut context_matchers = HashMap::new();