implement failpoint lib replacement

This commit is contained in:
Christian Schwarz
2025-07-11 11:59:01 +02:00
parent cec0543b51
commit f29b9737cc
11 changed files with 758 additions and 141 deletions

19
Cargo.lock generated
View File

@@ -2891,13 +2891,13 @@ dependencies = [
"arc-swap",
"bytes",
"camino",
"fail",
"futures",
"hyper 0.14.30",
"itertools 0.10.5",
"jemalloc_pprof",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"once_cell",
"pprof",
"regex",
@@ -3852,6 +3852,21 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "neon_failpoint"
version = "0.1.0"
dependencies = [
"anyhow",
"once_cell",
"parking_lot 0.12.1",
"regex",
"serde",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "never-say-never"
version = "6.6.666"
@@ -8164,7 +8179,6 @@ dependencies = [
"const_format",
"criterion",
"diatomic-waker",
"fail",
"futures",
"git-version",
"hex",
@@ -8172,6 +8186,7 @@ dependencies = [
"humantime",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"nix 0.30.1",
"once_cell",
"pem",

View File

@@ -21,6 +21,7 @@ members = [
"workspace_hack",
"libs/compute_api",
"libs/http-utils",
"libs/neon_failpoint",
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/postgres_ffi_types",
@@ -258,6 +259,7 @@ desim = { version = "0.1", path = "./libs/desim" }
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
neon_failpoint = { version = "0.1", path = "./libs/neon_failpoint/" }
neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }

View File

@@ -9,7 +9,7 @@ anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true
camino.workspace = true
fail.workspace = true
neon_failpoint.workspace = true
futures.workspace = true
hyper0.workspace = true
itertools.workspace = true

View File

@@ -1,7 +1,7 @@
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::apply_failpoint;
use neon_failpoint::{configure_failpoint, has_failpoints};
use crate::error::ApiError;
use crate::json::{json_request, json_response};
@@ -13,9 +13,9 @@ pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
/// List of actions to take, using the format described in neon_failpoint
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off"
pub actions: String,
}
@@ -24,7 +24,7 @@ pub async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
if !has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
@@ -34,13 +34,11 @@ pub async fn failpoints_handler(
for fp in failpoints {
tracing::info!("cfg failpoint: {} {}", fp.name, fp.actions);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
let cfg_result = configure_failpoint(&fp.name, &fp.actions);
if let Err(err_msg) = cfg_result {
if let Err(err) = cfg_result {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Failed to configure failpoints: {err_msg}"
"Failed to configure failpoints: {err}"
)));
}
}

View File

@@ -0,0 +1,25 @@
[package]
name = "neon_failpoint"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio.workspace = true
tokio-util = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["derive"] }
anyhow = { workspace = true }
regex = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
[dev-dependencies]
tracing-subscriber = { workspace = true, features = ["fmt"] }
[features]
default = []
testing = []
[[example]]
name = "context_demo"
required-features = ["testing"]

View File

@@ -0,0 +1,158 @@
# Neon Failpoint Library
A modern, async-first failpoint library for Neon, replacing the `fail` crate with enhanced functionality.
## Features
- **Async-first**: All failpoint operations are async and don't require `spawn_blocking`
- **Context matching**: Failpoints can be configured to trigger only when specific context conditions are met
- **Regex support**: Context values can be matched using regular expressions
- **Cancellation support**: All operations support cancellation tokens
- **Backward compatibility**: Drop-in replacement for existing `fail` crate usage
## Supported Actions
- `off` - Disable the failpoint
- `pause` - Pause indefinitely until disabled or cancelled
- `sleep(N)` - Sleep for N milliseconds
- `return` - Return early (empty value)
- `return(value)` - Return early with a specific value
- `exit` - Exit the process immediately
## Basic Usage
```rust
use neon_failpoint::{configure_failpoint, failpoint, FailpointResult};
// Configure a failpoint
configure_failpoint("my_failpoint", "return(42)").unwrap();
// Use the failpoint
match failpoint("my_failpoint", None).await {
FailpointResult::Return(value) => {
println!("Failpoint returned: {}", value);
return value.parse().unwrap_or_default();
}
FailpointResult::Continue => {
// Continue normal execution
}
FailpointResult::Cancelled => {
// Handle cancellation
}
}
```
## Context-Specific Failpoints
```rust
use neon_failpoint::{configure_failpoint_with_context, failpoint, failpoint_context};
use std::collections::HashMap;
// Configure a failpoint that only triggers for specific tenants
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
context_matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context(
"backup_operation",
"return(simulated_failure)",
context_matchers
).unwrap();
// Use with context
let context = failpoint_context! {
"tenant_id" => "test_123",
"operation" => "backup",
};
match failpoint("backup_operation", Some(&context)).await {
FailpointResult::Return(value) => {
// This will trigger for tenant_id matching "test_.*"
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with backup
}
FailpointResult::Cancelled => {}
}
```
## Macros
The library provides convenient macros for common patterns:
```rust
use neon_failpoint::{fail_point, pausable_failpoint, sleep_millis_async};
// Simple failpoint (equivalent to fail::fail_point!)
fail_point!("my_failpoint");
// Failpoint with return value handling
fail_point!("my_failpoint", |value| {
println!("Got value: {}", value);
return Ok(value.parse().unwrap_or_default());
});
// Pausable failpoint with cancellation
let cancel_token = CancellationToken::new();
if let Err(()) = pausable_failpoint!("pause_here", &cancel_token).await {
println!("Failpoint was cancelled");
}
// Sleep failpoint
sleep_millis_async!("sleep_here", &cancel_token).await;
```
## Migration from `fail` crate
The library provides a compatibility layer in `libs/utils/src/failpoint_support.rs`. Most existing code should work without changes, but you can migrate to the new async APIs for better performance:
### Before (with `fail` crate):
```rust
use utils::failpoint_support::pausable_failpoint;
// This used spawn_blocking internally
pausable_failpoint!("my_failpoint", &cancel_token).await?;
```
### After (with `neon_failpoint`):
```rust
use neon_failpoint::{failpoint_with_cancellation, FailpointResult};
// This is fully async
match failpoint_with_cancellation("my_failpoint", None, &cancel_token).await {
FailpointResult::Continue => {},
FailpointResult::Cancelled => return Err(()),
FailpointResult::Return(_) => {},
}
```
## Environment Variable Support
Failpoints can be configured via the `FAILPOINTS` environment variable:
```bash
FAILPOINTS="failpoint1=return(42);failpoint2=sleep(1000);failpoint3=exit"
```
## Testing
The library includes comprehensive tests and examples. Run them with:
```bash
cargo test --features testing
cargo run --example context_demo --features testing
```
## HTTP Configuration
The library integrates with the existing HTTP failpoint configuration API. Send POST requests to `/v1/failpoints` with:
```json
[
{
"name": "my_failpoint",
"actions": "return(42)"
}
]
```

View File

@@ -0,0 +1,57 @@
use neon_failpoint::{configure_failpoint_with_context, failpoint, failpoint_context, FailpointResult};
use std::collections::HashMap;
#[tokio::main]
async fn main() {
// Initialize tracing for better output
tracing_subscriber::fmt::init();
// Set up a context-specific failpoint
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
context_matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context(
"backup_operation",
"return(simulated_failure)",
context_matchers
).unwrap();
// Test with matching context
let context = failpoint_context! {
"tenant_id" => "test_123",
"operation" => "backup",
};
println!("Testing with matching context...");
match failpoint("backup_operation", Some(&context)).await {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {}", value);
}
FailpointResult::Continue => {
println!("Failpoint not triggered");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
}
// Test with non-matching context
let context = failpoint_context! {
"tenant_id" => "prod_456",
"operation" => "backup",
};
println!("Testing with non-matching context...");
match failpoint("backup_operation", Some(&context)).await {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {}", value);
}
FailpointResult::Continue => {
println!("Failpoint not triggered (expected)");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
}
}

View File

@@ -0,0 +1,311 @@
//! Neon failpoint library - a replacement for the `fail` crate with async support and context matching.
//!
//! This library provides failpoint functionality for testing with the following features:
//! - Async variants that don't require spawn_blocking
//! - Context-specific failpoints with regex matching
//! - Support for all actions used in the codebase: pause, sleep, return, exit, off
//! - Compatible API with the existing fail crate usage
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
pub mod macros;
/// Global failpoint registry
static FAILPOINTS: Lazy<Arc<RwLock<HashMap<String, FailpointConfig>>>> =
Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));
/// Configuration for a single failpoint
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailpointConfig {
/// The action to take when the failpoint is hit
pub action: FailpointAction,
/// Optional context matching rules
pub context_matchers: Option<HashMap<String, String>>,
}
/// Actions that can be taken when a failpoint is hit
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FailpointAction {
/// Do nothing - effectively disables the failpoint
Off,
/// Pause indefinitely until the failpoint is disabled
Pause,
/// Sleep for a specified duration in milliseconds
Sleep(u64),
/// Return a value (for failpoints that support it)
Return(String),
/// Exit the process immediately
Exit,
}
/// Context information passed to failpoints
pub type FailpointContext = HashMap<String, String>;
/// Result of hitting a failpoint
#[derive(Debug)]
pub enum FailpointResult {
/// Continue normal execution
Continue,
/// Return early with a value
Return(String),
/// Cancelled by cancellation token
Cancelled,
}
/// Initialize failpoints from environment variable
pub fn init() -> Result<()> {
if let Ok(env_failpoints) = std::env::var("FAILPOINTS") {
for entry in env_failpoints.split(';') {
if let Some((name, actions)) = entry.split_once('=') {
configure_failpoint(name, actions)?;
}
}
}
Ok(())
}
/// Configure a failpoint with the given action string
pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> {
let action = parse_action(actions)?;
let config = FailpointConfig {
action,
context_matchers: None,
};
let mut failpoints = FAILPOINTS.write();
failpoints.insert(name.to_string(), config);
tracing::info!("Configured failpoint: {} = {}", name, actions);
Ok(())
}
/// Configure a failpoint with context matching
pub fn configure_failpoint_with_context(
name: &str,
actions: &str,
context_matchers: HashMap<String, String>,
) -> Result<()> {
let action = parse_action(actions)?;
let config = FailpointConfig {
action,
context_matchers: Some(context_matchers),
};
let mut failpoints = FAILPOINTS.write();
failpoints.insert(name.to_string(), config);
tracing::info!("Configured failpoint with context: {} = {}", name, actions);
Ok(())
}
/// Remove a failpoint configuration
pub fn remove_failpoint(name: &str) {
let mut failpoints = FAILPOINTS.write();
failpoints.remove(name);
tracing::info!("Removed failpoint: {}", name);
}
/// Check if failpoints are enabled (for compatibility with fail crate)
pub fn has_failpoints() -> bool {
cfg!(feature = "testing") || std::env::var("FAILPOINTS").is_ok()
}
/// Execute a failpoint with optional context
pub async fn failpoint(name: &str, context: Option<&FailpointContext>) -> FailpointResult {
failpoint_with_cancellation(name, context, &CancellationToken::new()).await
}
/// Execute a failpoint with cancellation support
pub async fn failpoint_with_cancellation(
name: &str,
context: Option<&FailpointContext>,
cancel_token: &CancellationToken,
) -> FailpointResult {
// Only check failpoints if testing feature is enabled
if !cfg!(feature = "testing") {
return FailpointResult::Continue;
}
let config = {
let failpoints = FAILPOINTS.read();
failpoints.get(name).cloned()
};
let Some(config) = config else {
return FailpointResult::Continue;
};
// Check context matchers if provided
if let (Some(matchers), Some(ctx)) = (&config.context_matchers, context) {
if !matches_context(matchers, ctx) {
return FailpointResult::Continue;
}
}
tracing::info!("Hit failpoint: {}", name);
match config.action {
FailpointAction::Off => FailpointResult::Continue,
FailpointAction::Pause => {
tracing::info!("Failpoint {} pausing", name);
cancel_token.cancelled().await;
FailpointResult::Cancelled
}
FailpointAction::Sleep(millis) => {
let duration = Duration::from_millis(millis);
tracing::info!("Failpoint {} sleeping for {:?}", name, duration);
tokio::select! {
_ = tokio::time::sleep(duration) => {
tracing::info!("Failpoint {} sleep completed", name);
FailpointResult::Continue
}
_ = cancel_token.cancelled() => {
tracing::info!("Failpoint {} sleep cancelled", name);
FailpointResult::Cancelled
}
}
}
FailpointAction::Return(value) => {
tracing::info!("Failpoint {} returning: {}", name, value);
FailpointResult::Return(value)
}
FailpointAction::Exit => {
tracing::info!("Failpoint {} exiting process", name);
std::process::exit(1);
}
}
}
/// Parse an action string into a FailpointAction
fn parse_action(actions: &str) -> Result<FailpointAction> {
match actions {
"off" => Ok(FailpointAction::Off),
"pause" => Ok(FailpointAction::Pause),
"exit" => Ok(FailpointAction::Exit),
"return" => Ok(FailpointAction::Return(String::new())),
_ => {
// Try to parse return(value) format
if let Some(captures) = regex::Regex::new(r"^return\(([^)]*)\)$")?.captures(actions) {
let value = captures.get(1).unwrap().as_str().to_string();
Ok(FailpointAction::Return(value))
}
// Try to parse sleep(millis) format
else if let Some(captures) = regex::Regex::new(r"^sleep\((\d+)\)$")?.captures(actions) {
let millis = captures.get(1).unwrap().as_str().parse::<u64>()?;
Ok(FailpointAction::Sleep(millis))
}
// For backward compatibility, treat numeric values as sleep duration
else if let Ok(millis) = actions.parse::<u64>() {
Ok(FailpointAction::Sleep(millis))
}
else {
anyhow::bail!("Invalid failpoint action: {}", actions);
}
}
}
}
/// Check if the given context matches the matchers
fn matches_context(matchers: &HashMap<String, String>, context: &FailpointContext) -> bool {
for (key, pattern) in matchers {
let Some(value) = context.get(key) else {
return false;
};
// Try to compile and match as regex
if let Ok(regex) = Regex::new(pattern) {
if !regex.is_match(value) {
return false;
}
} else {
// Fall back to exact string match
if value != pattern {
return false;
}
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_failpoint_off() {
configure_failpoint("test_off", "off").unwrap();
let result = failpoint("test_off", None).await;
matches!(result, FailpointResult::Continue);
}
#[tokio::test]
async fn test_failpoint_return() {
configure_failpoint("test_return", "return(42)").unwrap();
let result = failpoint("test_return", None).await;
if let FailpointResult::Return(value) = result {
assert_eq!(value, "42");
} else {
panic!("Expected return result");
}
}
#[tokio::test]
async fn test_failpoint_sleep() {
configure_failpoint("test_sleep", "sleep(10)").unwrap();
let start = std::time::Instant::now();
let result = failpoint("test_sleep", None).await;
let duration = start.elapsed();
matches!(result, FailpointResult::Continue);
assert!(duration >= Duration::from_millis(10));
}
#[tokio::test]
async fn test_failpoint_pause_with_cancellation() {
configure_failpoint("test_pause", "pause").unwrap();
let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
// Cancel after 10ms
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(10)).await;
cancel_token_clone.cancel();
});
let result = failpoint_with_cancellation("test_pause", None, &cancel_token).await;
matches!(result, FailpointResult::Cancelled);
}
#[tokio::test]
async fn test_context_matching() {
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
configure_failpoint_with_context("test_context", "return(matched)", context_matchers).unwrap();
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
let result = failpoint("test_context", Some(&context)).await;
if let FailpointResult::Return(value) = result {
assert_eq!(value, "matched");
} else {
panic!("Expected return result");
}
// Test non-matching context
context.insert("tenant_id".to_string(), "other_123".to_string());
let result = failpoint("test_context", Some(&context)).await;
matches!(result, FailpointResult::Continue);
}
}

View File

@@ -0,0 +1,155 @@
//! Macros for convenient failpoint usage
/// Simple failpoint macro - async version that doesn't require spawn_blocking
#[macro_export]
macro_rules! fail_point {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
// For compatibility with fail crate, we need to handle the return value
// This will be used in closures like |x| x pattern
return Ok(value.parse().unwrap_or_default());
},
$crate::FailpointResult::Cancelled => {},
}
}
}};
($name:literal, |$var:ident| $body:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let $var = value.as_str();
$body
},
$crate::FailpointResult::Cancelled => {},
}
}
}};
}
/// Failpoint macro with context support
#[macro_export]
macro_rules! fail_point_with_context {
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
return Ok(value.parse().unwrap_or_default());
},
$crate::FailpointResult::Cancelled => {},
}
}
}};
($name:literal, $context:expr, |$var:ident| $body:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)).await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let $var = value.as_str();
$body
},
$crate::FailpointResult::Cancelled => {},
}
}
}};
}
/// Pausable failpoint macro - equivalent to the old pausable_failpoint
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {{
if cfg!(feature = "testing") {
let cancel = ::tokio_util::sync::CancellationToken::new();
let _ = $crate::pausable_failpoint!($name, &cancel);
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint_with_cancellation($name, None, $cancel).await {
$crate::FailpointResult::Continue => Ok(()),
$crate::FailpointResult::Return(_) => Ok(()),
$crate::FailpointResult::Cancelled => Err(()),
}
} else {
Ok(())
}
}};
}
/// Sleep failpoint macro - for async sleep operations
#[macro_export]
macro_rules! sleep_millis_async {
($name:literal) => {{
if cfg!(feature = "testing") {
$crate::failpoint($name, None).await;
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
$crate::failpoint_with_cancellation($name, None, $cancel).await;
}
}};
}
/// Convenience macro for creating failpoint context
#[macro_export]
macro_rules! failpoint_context {
($($key:expr => $value:expr),* $(,)?) => {{
let mut context = ::std::collections::HashMap::new();
$(
context.insert($key.to_string(), $value.to_string());
)*
context
}};
}
/// Macro for simple failpoint calls that might return early
#[macro_export]
macro_rules! failpoint_return {
($name:literal) => {{
if cfg!(feature = "testing") {
if let $crate::FailpointResult::Return(value) = $crate::failpoint($name, None).await {
return value.parse().unwrap_or_default();
}
}
}};
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
if let $crate::FailpointResult::Return(value) = $crate::failpoint($name, Some($context)).await {
return value.parse().unwrap_or_default();
}
}
}};
}
/// Macro for failpoint calls that might bail with an error
#[macro_export]
macro_rules! failpoint_bail {
($name:literal, $error_msg:literal) => {{
if cfg!(feature = "testing") {
if let $crate::FailpointResult::Return(_) = $crate::failpoint($name, None).await {
anyhow::bail!($error_msg);
}
}
}};
($name:literal, $context:expr, $error_msg:literal) => {{
if cfg!(feature = "testing") {
if let $crate::FailpointResult::Return(_) = $crate::failpoint($name, Some($context)).await {
anyhow::bail!($error_msg);
}
}
}};
}
// Re-export for convenience
pub use fail_point;
pub use fail_point_with_context;
pub use failpoint_bail;
pub use failpoint_context;
pub use failpoint_return;
pub use pausable_failpoint;
pub use sleep_millis_async;

View File

@@ -9,7 +9,7 @@ default = ["rename_noreplace"]
rename_noreplace = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
testing = ["neon_failpoint/testing"]
[dependencies]
arc-swap.workspace = true
@@ -24,7 +24,7 @@ diatomic-waker.workspace = true
git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true
fail.workspace = true
neon_failpoint.workspace = true
futures = { workspace = true }
jsonwebtoken.workspace = true
nix = { workspace = true, features = ["ioctl"] }

View File

@@ -1,9 +1,12 @@
//! Failpoint support code shared between pageserver and safekeepers.
//!
//! This module provides a compatibility layer over the new neon_failpoint crate.
use tokio_util::sync::CancellationToken;
pub use neon_failpoint::{configure_failpoint as apply_failpoint, init, has_failpoints};
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
/// This is now a compatibility wrapper around the new neon_failpoint crate.
///
/// Optionally pass a cancellation token, and this failpoint will drop out of
/// its pause when the cancellation token fires. This is useful for testing
@@ -11,9 +14,6 @@ use tokio_util::sync::CancellationToken;
/// The macro evaluates to a Result in that case, where Ok(()) is the case
/// where the failpoint was not paused, and Err() is the case where cancellation
/// token fired while evaluating the failpoint.
///
/// Remember to unpause the failpoint in the test; until that happens, one of the
/// limited number of spawn_blocking thread pool threads is leaked.
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {{
@@ -24,26 +24,10 @@ macro_rules! pausable_failpoint {
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
let failpoint_fut = ::tokio::task::spawn_blocking({
let current = ::tracing::Span::current();
move || {
let _entered = current.entered();
::tracing::info!("at failpoint {}", $name);
::fail::fail_point!($name);
}
});
let cancel_fut = async move {
$cancel.cancelled().await;
};
::tokio::select! {
res = failpoint_fut => {
res.expect("spawn_blocking");
// continue with execution
Ok(())
},
_ = cancel_fut => {
Err(())
}
match ::neon_failpoint::failpoint_with_cancellation($name, None, $cancel).await {
::neon_failpoint::FailpointResult::Continue => Ok(()),
::neon_failpoint::FailpointResult::Return(_) => Ok(()),
::neon_failpoint::FailpointResult::Cancelled => Err(()),
}
} else {
Ok(())
@@ -53,7 +37,7 @@ macro_rules! pausable_failpoint {
pub use pausable_failpoint;
/// use with fail::cfg("$name", "return(2000)")
/// use with neon_failpoint::configure_failpoint("$name", "sleep(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
/// specified time (in milliseconds). The main difference is that we use async
@@ -66,120 +50,32 @@ pub use pausable_failpoint;
#[macro_export]
macro_rules! __failpoint_sleep_millis_async {
($name:literal) => {{
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::fail::fail_point!($name, |x| x);
::std::option::Option::None
})();
// Sleep if the action was a returned value
if let ::std::option::Option::Some(duration_str) = should_sleep {
$crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await
if cfg!(feature = "testing") {
::neon_failpoint::failpoint($name, None).await;
}
}};
($name:literal, $cancel:expr) => {{
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::fail::fail_point!($name, |x| x);
::std::option::Option::None
})();
// Sleep if the action was a returned value
if let ::std::option::Option::Some(duration_str) = should_sleep {
$crate::failpoint_support::failpoint_sleep_cancellable_helper(
$name,
duration_str,
$cancel,
)
.await
if cfg!(feature = "testing") {
::neon_failpoint::failpoint_with_cancellation($name, None, $cancel).await;
}
}};
}
pub use __failpoint_sleep_millis_async as sleep_millis_async;
// Helper function used by the macro. (A function has nicer scoping so we
// don't need to decorate everything with "::")
// Helper functions are no longer needed as the new implementation handles this internally
// but we keep them for backward compatibility
#[doc(hidden)]
pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) {
let millis = duration_str.parse::<u64>().unwrap();
let d = std::time::Duration::from_millis(millis);
tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
tokio::time::sleep(d).await;
tracing::info!("failpoint {:?}: sleep done", name);
pub async fn failpoint_sleep_helper(_name: &'static str, _duration_str: String) {
// This is now handled by the neon_failpoint crate internally
tracing::warn!("failpoint_sleep_helper is deprecated, use neon_failpoint directly");
}
// Helper function used by the macro. (A function has nicer scoping so we
// don't need to decorate everything with "::")
#[doc(hidden)]
pub async fn failpoint_sleep_cancellable_helper(
name: &'static str,
duration_str: String,
cancel: &CancellationToken,
_name: &'static str,
_duration_str: String,
_cancel: &CancellationToken,
) {
let millis = duration_str.parse::<u64>().unwrap();
let d = std::time::Duration::from_millis(millis);
tracing::info!("failpoint {:?}: sleeping for {:?}", name, d);
tokio::time::timeout(d, cancel.cancelled()).await.ok();
tracing::info!("failpoint {:?}: sleep done", name);
}
/// Initialize the configured failpoints
///
/// You must call this function before any concurrent threads do operations.
pub fn init() -> fail::FailScenario<'static> {
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
// We want non-default behavior for `exit`, though, so, we handle it separately.
//
// Format for FAILPOINTS is "name=actions" separated by ";".
let actions = std::env::var("FAILPOINTS");
if actions.is_ok() {
// SAFETY: this function should before any threads start and access env vars concurrently
unsafe {
std::env::remove_var("FAILPOINTS");
}
} else {
// let the library handle non-utf8, or nothing for not present
}
let scenario = fail::FailScenario::setup();
if let Ok(val) = actions {
val.split(';')
.enumerate()
.map(|(i, s)| s.split_once('=').ok_or((i, s)))
.for_each(|res| {
let (name, actions) = match res {
Ok(t) => t,
Err((i, s)) => {
panic!(
"startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
i + 1,
);
}
};
if let Err(e) = apply_failpoint(name, actions) {
panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
}
});
}
scenario
}
pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
if actions == "exit" {
fail::cfg_callback(name, exit_failpoint)
} else {
fail::cfg(name, actions)
}
}
#[inline(never)]
fn exit_failpoint() {
tracing::info!("Exit requested by failpoint");
std::process::exit(1);
// This is now handled by the neon_failpoint crate internally
tracing::warn!("failpoint_sleep_cancellable_helper is deprecated, use neon_failpoint directly");
}