Compare commits

...

28 Commits

Author SHA1 Message Date
Christian Schwarz
0f400096b1 lints 2025-07-11 18:14:51 +00:00
Christian Schwarz
9baacfa406 cargo fmt 2025-07-11 18:12:37 +00:00
Christian Schwarz
904a63dff5 push down Arc<Mutex>> 2025-07-11 18:12:21 +00:00
Christian Schwarz
67dbe63275 avoid the callback 2025-07-11 17:58:07 +00:00
Christian Schwarz
8662463b06 less lock contention 2025-07-11 17:55:13 +00:00
Christian Schwarz
78ad89b4d5 some abstraction for notifiers 2025-07-11 16:51:40 +00:00
Christian Schwarz
9c63b7c39c switch to std::sync::Mutex 2025-07-11 16:30:07 +00:00
Christian Schwarz
69918a041c storcon was not passing headers 2025-07-11 16:13:55 +00:00
Christian Schwarz
3306c5045b failpoint_supper::sleep_millis_async was wrong, use sync failpoints 2025-07-11 15:55:49 +00:00
Christian Schwarz
137328f304 initial set of fixups 2025-07-11 14:07:33 +00:00
Christian Schwarz
19b0a79968 cargo fmt 2025-07-11 15:45:42 +02:00
Christian Schwarz
5d50c3c086 compile fix 2025-07-11 15:45:09 +02:00
Christian Schwarz
10955dfb52 clippy 2025-07-11 15:44:14 +02:00
Christian Schwarz
a3e05b5abf everything compiles 2025-07-11 15:39:12 +02:00
Christian Schwarz
2156d02658 reexport either 2025-07-11 15:19:48 +02:00
Christian Schwarz
9e923bcc9b remove unused stuff 2025-07-11 15:16:50 +02:00
Christian Schwarz
3a3062d236 great progress although that Pin<Box<dyn Future>> hurts perf, let's revisit it 2025-07-11 15:11:00 +02:00
Christian Schwarz
01cd326153 pageserver crate: install neon_failpoint as fail + cargo fmt 2025-07-11 14:39:41 +02:00
Christian Schwarz
a1095efd85 macros fixup 2025-07-11 14:28:40 +02:00
Christian Schwarz
75d4ccb05c support context failpoint in client and mgmt API 2025-07-11 14:26:23 +02:00
Christian Schwarz
415cdff336 audit for failpoint usage, implement missing ops 2025-07-11 14:00:23 +02:00
Christian Schwarz
27dc11f5cc apply to the _with_context() macros and update docs 2025-07-11 13:11:17 +02:00
Christian Schwarz
ac5279e600 fix macros more 2025-07-11 13:03:22 +02:00
Christian Schwarz
9aeee51bf3 safekeeper compiles now 2025-07-11 12:53:33 +02:00
Christian Schwarz
fc3f55d236 fix failpoint macros 2025-07-11 12:46:33 +02:00
Christian Schwarz
8a04a62ecd remove fail crate, put neon_failpoint in place 2025-07-11 12:37:09 +02:00
Christian Schwarz
4adc3bdd3a fixups 2025-07-11 12:29:39 +02:00
Christian Schwarz
f29b9737cc implement failpoint lib replacement 2025-07-11 11:59:01 +02:00
46 changed files with 2302 additions and 241 deletions

30
Cargo.lock generated
View File

@@ -1330,7 +1330,6 @@ dependencies = [
"chrono",
"clap",
"compute_api",
"fail",
"flate2",
"futures",
"hostname-validator",
@@ -1339,6 +1338,7 @@ dependencies = [
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"nix 0.30.1",
"notify",
"num_cpus",
@@ -2891,13 +2891,13 @@ dependencies = [
"arc-swap",
"bytes",
"camino",
"fail",
"futures",
"hyper 0.14.30",
"itertools 0.10.5",
"jemalloc_pprof",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"once_cell",
"pprof",
"regex",
@@ -3852,6 +3852,23 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "neon_failpoint"
version = "0.1.0"
dependencies = [
"anyhow",
"either",
"once_cell",
"parking_lot 0.12.1",
"rand 0.8.5",
"regex",
"serde",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "never-say-never"
version = "6.6.666"
@@ -4357,7 +4374,6 @@ dependencies = [
"either",
"enum-map",
"enumset",
"fail",
"futures",
"hashlink",
"hex",
@@ -4372,6 +4388,7 @@ dependencies = [
"jsonwebtoken",
"md5",
"metrics",
"neon_failpoint",
"nix 0.30.1",
"num-traits",
"num_cpus",
@@ -6191,7 +6208,6 @@ dependencies = [
"criterion",
"desim",
"env_logger",
"fail",
"futures",
"hex",
"http 1.1.0",
@@ -6201,6 +6217,7 @@ dependencies = [
"itertools 0.10.5",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"once_cell",
"pageserver_api",
"parking_lot 0.12.1",
@@ -6887,7 +6904,6 @@ dependencies = [
"diesel",
"diesel-async",
"diesel_migrations",
"fail",
"futures",
"governor",
"hex",
@@ -6900,6 +6916,7 @@ dependencies = [
"lasso",
"measured",
"metrics",
"neon_failpoint",
"once_cell",
"pageserver_api",
"pageserver_client",
@@ -8164,7 +8181,7 @@ dependencies = [
"const_format",
"criterion",
"diatomic-waker",
"fail",
"either",
"futures",
"git-version",
"hex",
@@ -8172,6 +8189,7 @@ dependencies = [
"humantime",
"jsonwebtoken",
"metrics",
"neon_failpoint",
"nix 0.30.1",
"once_cell",
"pem",

View File

@@ -21,6 +21,7 @@ members = [
"workspace_hack",
"libs/compute_api",
"libs/http-utils",
"libs/neon_failpoint",
"libs/pageserver_api",
"libs/postgres_ffi",
"libs/postgres_ffi_types",
@@ -97,7 +98,6 @@ diatomic-waker = { version = "0.2.3" }
either = "1.8"
enum-map = "2.4.2"
enumset = "1.0.12"
fail = "0.5.0"
fallible-iterator = "0.2"
framed-websockets = { version = "0.1.0", git = "https://github.com/neondatabase/framed-websockets" }
futures = "0.3"
@@ -258,6 +258,7 @@ desim = { version = "0.1", path = "./libs/desim" }
endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" }
http-utils = { version = "0.1", path = "./libs/http-utils/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
neon_failpoint = { version = "0.1", path = "./libs/neon_failpoint/" }
neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }

View File

@@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
# Enables test specific features.
testing = ["fail/failpoints"]
testing = ["neon_failpoint/testing"]
[dependencies]
async-compression.workspace = true
@@ -23,7 +23,7 @@ camino.workspace = true
chrono.workspace = true
cfg-if.workspace = true
clap.workspace = true
fail.workspace = true
neon_failpoint.workspace = true
flate2.workspace = true
futures.workspace = true
http.workspace = true

View File

@@ -154,7 +154,7 @@ impl Cli {
fn main() -> Result<()> {
let cli = Cli::parse();
let scenario = failpoint_support::init();
failpoint_support::init().unwrap();
// For historical reasons, the main thread that processes the config and launches postgres
// is synchronous, but we always have this tokio runtime available and we "enter" it so
@@ -201,8 +201,6 @@ fn main() -> Result<()> {
let exit_code = compute_node.run()?;
scenario.teardown();
deinit_and_exit(exit_code);
}

View File

@@ -1,8 +1,9 @@
use axum::response::{IntoResponse, Response};
use http::StatusCode;
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::info;
use utils::failpoint_support::apply_failpoint;
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
@@ -11,10 +12,16 @@ pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
/// List of actions to take, using the format described in neon_failpoint
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)"
/// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action"
pub actions: String,
/// Optional context matching rules for conditional failpoints
/// Each key-value pair specifies a context key and a regex pattern to match against
/// All context matchers must match for the failpoint to trigger
#[serde(default, skip_serializing_if = "Option::is_none")]
pub context_matchers: Option<HashMap<String, String>>,
}
use crate::http::JsonResponse;
@@ -24,7 +31,7 @@ use crate::http::extract::Json;
pub(in crate::http) async fn configure_failpoints(
failpoints: Json<ConfigureFailpointsRequest>,
) -> Response {
if !fail::has_failpoints() {
if !neon_failpoint::has_failpoints() {
return JsonResponse::error(
StatusCode::PRECONDITION_FAILED,
"Cannot manage failpoints because neon was compiled without failpoints support",
@@ -32,16 +39,21 @@ pub(in crate::http) async fn configure_failpoints(
}
for fp in &*failpoints {
info!("cfg failpoint: {} {}", fp.name, fp.actions);
info!(
"cfg failpoint: {} {} (context: {:?})",
fp.name, fp.actions, fp.context_matchers
);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
let cfg_result = if let Some(context_matchers) = fp.context_matchers.clone() {
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
} else {
configure_failpoint(&fp.name, &fp.actions)
};
if let Err(e) = cfg_result {
return JsonResponse::error(
StatusCode::BAD_REQUEST,
format!("failed to configure failpoints: {e}"),
format!("failed to configure failpoint '{}': {e}", fp.name),
);
}
}

View File

@@ -1,5 +1,5 @@
use anyhow::{Context, Result};
use fail::fail_point;
use neon_failpoint::fail_point;
use tokio_postgres::{Client, Transaction};
use tracing::{error, info};
@@ -40,13 +40,14 @@ impl<'m> MigrationRunner<'m> {
// middle of applying a series of migrations fails in an expected
// manner
if cfg!(feature = "testing") {
let fail = (|| {
fail_point!("compute-migration", |fail_migration_id| {
let fail = async {
fail_point!("compute-migration", |fail_migration_id: Option<String>| {
migration_id == fail_migration_id.unwrap().parse::<i64>().unwrap()
});
false
})();
}
.await;
if fail {
return Err(anyhow::anyhow!(format!(

View File

@@ -9,7 +9,7 @@ anyhow.workspace = true
arc-swap.workspace = true
bytes.workspace = true
camino.workspace = true
fail.workspace = true
neon_failpoint.workspace = true
futures.workspace = true
hyper0.workspace = true
itertools.workspace = true

View File

@@ -1,7 +1,8 @@
use hyper::{Body, Request, Response, StatusCode};
use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::apply_failpoint;
use crate::error::ApiError;
use crate::json::{json_request, json_response};
@@ -13,10 +14,16 @@ pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
pub struct FailpointConfig {
/// Name of the fail point
pub name: String,
/// List of actions to take, using the format described in `fail::cfg`
/// List of actions to take, using the format described in neon_failpoint
///
/// We also support `actions = "exit"` to cause the fail point to immediately exit.
/// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off", "panic(message)"
/// Plus probability-based actions: "N%return(value)", "N%M*return(value)", "N%action", "N%M*action"
pub actions: String,
/// Optional context matching rules for conditional failpoints
/// Each key-value pair specifies a context key and a regex pattern to match against
/// All context matchers must match for the failpoint to trigger
#[serde(default, skip_serializing_if = "Option::is_none")]
pub context_matchers: Option<HashMap<String, String>>,
}
/// Configure failpoints through http.
@@ -24,7 +31,7 @@ pub async fn failpoints_handler(
mut request: Request<Body>,
_cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
if !fail::has_failpoints() {
if !has_failpoints() {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Cannot manage failpoints because neon was compiled without failpoints support"
)));
@@ -32,15 +39,24 @@ pub async fn failpoints_handler(
let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?;
for fp in failpoints {
tracing::info!("cfg failpoint: {} {}", fp.name, fp.actions);
tracing::info!(
"cfg failpoint: {} {} (context: {:?})",
fp.name,
fp.actions,
fp.context_matchers
);
// We recognize one extra "action" that's not natively recognized
// by the failpoints crate: exit, to immediately kill the process
let cfg_result = apply_failpoint(&fp.name, &fp.actions);
let cfg_result = if let Some(context_matchers) = fp.context_matchers {
configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers)
} else {
configure_failpoint(&fp.name, &fp.actions)
};
if let Err(err_msg) = cfg_result {
if let Err(err) = cfg_result {
return Err(ApiError::BadRequest(anyhow::anyhow!(
"Failed to configure failpoints: {err_msg}"
"Failed to configure failpoint '{}': {}",
fp.name,
err
)));
}
}

View File

@@ -0,0 +1,27 @@
[package]
name = "neon_failpoint"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { workspace = true, features = ["time", "sync", "rt-multi-thread"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["derive"] }
anyhow = { workspace = true }
regex = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
either = { workspace = true }
[dev-dependencies]
tracing-subscriber = { workspace = true, features = ["fmt"] }
[features]
default = []
testing = []
[[example]]
name = "context_demo"
required-features = ["testing"]

View File

@@ -0,0 +1,460 @@
# Neon Failpoint Library
A modern, async-first failpoint library for Neon, replacing the `fail` crate with enhanced functionality.
## Features
- **Async-first**: All failpoint operations are async and don't require `spawn_blocking`
- **Context matching**: Failpoints can be configured to trigger only when specific context conditions are met
- **Regex support**: Context values can be matched using regular expressions
- **Cancellation support**: All operations support cancellation tokens
- **Dynamic reconfiguration**: Paused and sleeping tasks automatically resume when failpoint configurations change
- **Backward compatibility**: Drop-in replacement for existing `fail` crate usage
## Supported Actions
- `off` - Disable the failpoint
- `pause` - Pause indefinitely until disabled, reconfigured, or cancelled
- `sleep(N)` - Sleep for N milliseconds (can be interrupted by reconfiguration)
- `return` - Return early (empty value)
- `return(value)` - Return early with a specific value
- `exit` - Exit the process immediately
- `panic(message)` - Panic the process with a custom message
- `N%return(value)` - Return with a specific value N% of the time (probability-based)
- `N%M*return(value)` - Return with a specific value N% of the time, maximum M times
- `N%action` - Execute any action N% of the time (probability-based)
- `N%M*action` - Execute any action N% of the time, maximum M times
## Probability-Based Actions
The library supports probability-based failpoints that trigger only a percentage of the time:
```rust
// 50% chance to return a value
configure_failpoint("random_failure", "50%return(error)").unwrap();
// 10% chance to sleep, maximum 3 times
configure_failpoint("occasional_delay", "10%3*sleep(1000)").unwrap();
// 25% chance to panic
configure_failpoint("rare_panic", "25%panic(critical error)").unwrap();
```
The probability system uses a counter to track how many times a probability-based action has been triggered, allowing for precise control over test scenarios.
## Dynamic Behavior
When a failpoint is reconfigured while tasks are waiting on it:
- **Paused tasks** will immediately resume and continue normal execution
- **Sleeping tasks** will wake up early and continue normal execution
- **Removed failpoints** will cause all waiting tasks to resume normally
The new configuration only applies to future hits of the failpoint, not to tasks that are already waiting. This allows for flexible testing scenarios where you can pause execution, inspect state, and then resume execution dynamically.
## Example: Dynamic Reconfiguration
```rust
use neon_failpoint::{configure_failpoint, failpoint, FailpointResult};
use tokio::time::Duration;
// Start a task that will hit a failpoint
let task = tokio::spawn(async {
println!("About to hit failpoint");
match failpoint("test_pause", None).await {
FailpointResult::Return(value) => println!("Returned: {}", value),
FailpointResult::Continue => println!("Continued normally"),
FailpointResult::Cancelled => println!("Cancelled"),
}
});
// Configure the failpoint to pause
configure_failpoint("test_pause", "pause").unwrap();
// Let the task hit the failpoint and pause
tokio::time::sleep(Duration::from_millis(10)).await;
// Change the failpoint configuration - this will wake up the paused task
// The task will resume and continue normally (not apply the new config)
configure_failpoint("test_pause", "return(not_applied)").unwrap();
// The task will complete with Continue, not Return
let result = task.await.unwrap();
```
## Basic Usage
```rust
use neon_failpoint::{configure_failpoint, failpoint, FailpointResult};
// Configure a failpoint
configure_failpoint("my_failpoint", "return(42)").unwrap();
// Use the failpoint
match failpoint("my_failpoint", None).await {
FailpointResult::Return(value) => {
println!("Failpoint returned: {}", value);
return value.parse().unwrap_or_default();
}
FailpointResult::Continue => {
// Continue normal execution
}
FailpointResult::Cancelled => {
// Handle cancellation
}
}
```
## Context-Based Failpoint Configuration
Context allows you to create **conditional failpoints** that only trigger when specific runtime conditions are met. This is particularly useful for testing scenarios where you want to inject failures only for specific tenants, operations, or other contextual conditions.
### Configuring Context-Based Failpoints
Use `configure_failpoint_with_context()` to set up failpoints with context matching:
```rust
use neon_failpoint::configure_failpoint_with_context;
use std::collections::HashMap;
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
context_matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context(
"backup_operation", // failpoint name
"return(simulated_failure)", // action to take
context_matchers // context matching rules
).unwrap();
```
### Context Matching Rules
The context matching system works as follows:
1. **Key-Value Matching**: Each entry in `context_matchers` specifies a key that must exist in the runtime context
2. **Regex Support**: Values in `context_matchers` are treated as regular expressions first
3. **Fallback to Exact Match**: If the regex compilation fails, it falls back to exact string matching
4. **ALL Must Match**: All context matchers must match for the failpoint to trigger
### Runtime Context Usage
When code hits a failpoint, it provides context using a `HashMap<String, String>`:
```rust
use neon_failpoint::{failpoint, FailpointResult};
use std::collections::HashMap;
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
context.insert("user_id".to_string(), "user_456".to_string());
match failpoint("backup_operation", Some(&context)) {
either::Either::Left(result) => {
match result {
FailpointResult::Return(value) => {
// This will only trigger if ALL context matchers match
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with normal backup operation
}
FailpointResult::Cancelled => {}
}
}
either::Either::Right(future) => {
match future.await {
FailpointResult::Return(value) => {
// This will only trigger if ALL context matchers match
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with normal backup operation
}
FailpointResult::Cancelled => {}
}
}
}
```
### Context Matching Examples
#### Regex Matching
```rust
// Configure to match test tenants only
let mut matchers = HashMap::new();
matchers.insert("tenant_id".to_string(), "test_.*".to_string());
configure_failpoint_with_context("test_failpoint", "pause", matchers).unwrap();
// This will match
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
// This will NOT match
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "prod_123".to_string());
```
#### Multiple Conditions
```rust
// Must match BOTH tenant pattern AND operation
let mut matchers = HashMap::new();
matchers.insert("tenant_id".to_string(), "test_.*".to_string());
matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context("backup_test", "return(failed)", matchers).unwrap();
// This will match (both conditions met)
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
// This will NOT match (missing operation)
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "restore".to_string());
```
#### Exact String Matching
```rust
// If regex compilation fails, falls back to exact match
let mut matchers = HashMap::new();
matchers.insert("env".to_string(), "staging".to_string());
configure_failpoint_with_context("env_specific", "sleep(1000)", matchers).unwrap();
// This will match
let mut context = HashMap::new();
context.insert("env".to_string(), "staging".to_string());
// This will NOT match
let mut context = HashMap::new();
context.insert("env".to_string(), "production".to_string());
```
### Benefits of Context-Based Failpoints
1. **Selective Testing**: Only inject failures for specific tenants, environments, or operations
2. **Production Safety**: Avoid accidentally triggering failpoints in production by using context filters
3. **Complex Scenarios**: Test interactions between different components with targeted failures
4. **Debugging**: Isolate issues to specific contexts without affecting the entire system
### Context vs. Non-Context Failpoints
- **Without context**: `configure_failpoint("name", "action")` - triggers for ALL hits
- **With context**: `configure_failpoint_with_context("name", "action", matchers)` - triggers only when context matches
## Context-Specific Failpoints
```rust
use neon_failpoint::{configure_failpoint_with_context, failpoint};
use std::collections::HashMap;
// Configure a failpoint that only triggers for specific tenants
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
context_matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context(
"backup_operation",
"return(simulated_failure)",
context_matchers
).unwrap();
// Use with context
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
match failpoint("backup_operation", Some(&context)) {
either::Either::Left(result) => {
match result {
FailpointResult::Return(value) => {
// This will trigger for tenant_id matching "test_.*"
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with backup
}
FailpointResult::Cancelled => {}
}
}
either::Either::Right(future) => {
match future.await {
FailpointResult::Return(value) => {
// This will trigger for tenant_id matching "test_.*"
println!("Backup failed: {}", value);
}
FailpointResult::Continue => {
// Continue with backup
}
FailpointResult::Cancelled => {}
}
}
}
```
## Macros
The library provides convenient macros for common patterns:
### `fail_point!` - Basic Failpoint Macro
The `fail_point!` macro has three variants:
1. **Simple failpoint** - `fail_point!(name)`
- Just checks the failpoint and continues or returns early (no value)
- Panics if the failpoint is configured with `return(value)` since no closure is provided
2. **Failpoint with return handler** - `fail_point!(name, closure)`
- Provides a closure to handle return values from the failpoint
- The closure receives `Option<String>` and should return the appropriate value
3. **Conditional failpoint** - `fail_point!(name, condition, closure)`
- Only checks the failpoint if the condition is true
- Provides a closure to handle return values (receives `&str`)
```rust
use neon_failpoint::fail_point;
// Simple failpoint - just continue or return early
fail_point!("my_failpoint");
// Failpoint with return value handling
fail_point!("my_failpoint", |value: Option<String>| {
match value {
Some(v) => {
println!("Got value: {}", v);
return Ok(v.parse().unwrap_or_default());
}
None => return Ok(42), // Default return value
}
});
// Conditional failpoint - only check if condition is met
let should_fail = some_condition();
fail_point!("conditional_failpoint", should_fail, |value: &str| {
println!("Conditional failpoint triggered with: {}", value);
return Err(anyhow::anyhow!("Simulated failure"));
});
```
### `fail_point_with_context!` - Context-Aware Failpoint Macro
The `fail_point_with_context!` macro has three variants that mirror `fail_point!` but include context:
1. **Simple with context** - `fail_point_with_context!(name, context)`
2. **With context and return handler** - `fail_point_with_context!(name, context, closure)`
3. **Conditional with context** - `fail_point_with_context!(name, context, condition, closure)`
```rust
use neon_failpoint::{fail_point_with_context};
use std::collections::HashMap;
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
// Simple context failpoint
fail_point_with_context!("backup_failpoint", &context);
// Context failpoint with return handler
fail_point_with_context!("backup_failpoint", &context, |value: Option<String>| {
match value {
Some(v) => return Err(anyhow::anyhow!("Backup failed: {}", v)),
None => return Err(anyhow::anyhow!("Backup failed")),
}
});
// Conditional context failpoint
let is_test_tenant = tenant_id.starts_with("test_");
fail_point_with_context!("backup_failpoint", &context, is_test_tenant, |value: Option<String>| {
// Only triggers for test tenants
return Err(anyhow::anyhow!("Test tenant backup failure"));
});
```
### Other Utility Macros
```rust
use neon_failpoint::{pausable_failpoint, sleep_millis_async};
// Pausable failpoint with cancellation
let cancel_token = CancellationToken::new();
if let Err(()) = pausable_failpoint!("pause_here", &cancel_token).await {
println!("Failpoint was cancelled");
}
// Sleep failpoint
sleep_millis_async!("sleep_here", &cancel_token).await;
// Context creation helper
let mut context = HashMap::new();
context.insert("key1".to_string(), "value1".to_string());
context.insert("key2".to_string(), "value2".to_string());
```
### Argument Reference
- **`name`**: String literal - the name of the failpoint
- **`context`**: Expression that evaluates to `&HashMap<String, String>` - context for matching
- **`condition`**: Boolean expression - only check failpoint if true
- **`closure`**: Closure that handles return values:
- For `fail_point!` with closure: receives `Option<String>`
- For conditional variants: receives `&str`
- For `fail_point_with_context!` with closure: receives `Option<String>`
- **`cancel`**: `&CancellationToken` - for cancellation support
## Migration from `fail` crate
The library provides a compatibility layer in `libs/utils/src/failpoint_support.rs`. Most existing code should work without changes, but you can migrate to the new async APIs for better performance:
### Before (with `fail` crate):
```rust
use utils::failpoint_support::pausable_failpoint;
// This used spawn_blocking internally
pausable_failpoint!("my_failpoint", &cancel_token).await?;
```
### After (with `neon_failpoint`):
```rust
use neon_failpoint::{failpoint_with_cancellation, FailpointResult};
// This is fully async
match failpoint_with_cancellation("my_failpoint", None, &cancel_token).await {
FailpointResult::Continue => {},
FailpointResult::Cancelled => return Err(()),
FailpointResult::Return(_) => {},
}
```
## Environment Variable Support
Failpoints can be configured via the `FAILPOINTS` environment variable:
```bash
FAILPOINTS="failpoint1=return(42);failpoint2=sleep(1000);failpoint3=exit"
```
## Testing
The library includes comprehensive tests and examples. Run them with:
```bash
cargo test --features testing
cargo run --example context_demo --features testing
```
## HTTP Configuration
The library integrates with the existing HTTP failpoint configuration API. Send POST requests to `/v1/failpoints` with:
```json
[
{
"name": "my_failpoint",
"actions": "return(42)"
}
]
```

View File

@@ -0,0 +1,82 @@
use neon_failpoint::{configure_failpoint_with_context, failpoint, FailpointResult};
use std::collections::HashMap;
#[tokio::main]
async fn main() {
// Initialize tracing for better output
tracing_subscriber::fmt::init();
// Set up a context-specific failpoint
let mut context_matchers = HashMap::new();
context_matchers.insert("tenant_id".to_string(), "test_.*".to_string());
context_matchers.insert("operation".to_string(), "backup".to_string());
configure_failpoint_with_context(
"backup_operation",
"return(simulated_failure)",
context_matchers,
)
.unwrap();
// Test with matching context
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "test_123".to_string());
context.insert("operation".to_string(), "backup".to_string());
println!("Testing with matching context...");
match failpoint("backup_operation", Some(&context)) {
either::Either::Left(result) => match result {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {value:?}");
}
FailpointResult::Continue => {
println!("Failpoint not triggered");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
},
either::Either::Right(future) => match future.await {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {value:?}");
}
FailpointResult::Continue => {
println!("Failpoint not triggered");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
},
}
// Test with non-matching context
let mut context = HashMap::new();
context.insert("tenant_id".to_string(), "prod_456".to_string());
context.insert("operation".to_string(), "backup".to_string());
println!("Testing with non-matching context...");
match failpoint("backup_operation", Some(&context)) {
either::Either::Left(result) => match result {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {value:?}");
}
FailpointResult::Continue => {
println!("Failpoint not triggered (expected)");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
},
either::Either::Right(future) => match future.await {
FailpointResult::Return(value) => {
println!("Failpoint triggered with value: {value:?}");
}
FailpointResult::Continue => {
println!("Failpoint not triggered (expected)");
}
FailpointResult::Cancelled => {
println!("Failpoint cancelled");
}
},
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,356 @@
//! Macros for convenient failpoint usage
/// Simple failpoint macro - async version that doesn't require spawn_blocking
#[macro_export]
macro_rules! fail_point {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}};
($name:literal, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}};
($name:literal, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}
}};
}
/// Simple failpoint macro - sync version that panics if async action is triggered
#[macro_export]
macro_rules! fail_point_sync {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
}
}
}};
($name:literal, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
}
}
}};
($name:literal, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_sync! was used. Use fail_point! instead.", $name);
},
}
}
}
}};
}
/// Failpoint macro with context support
#[macro_export]
macro_rules! fail_point_with_context {
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}};
($name:literal, $context:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}};
($name:literal, $context:expr, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(future) => {
match future.await {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
}
}
}
}};
}
/// Failpoint macro with context support - sync version
#[macro_export]
macro_rules! fail_point_with_context_sync {
($name:literal, $context:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(_) => {
panic!("failpoint was configured with return(X) but Rust code does not pass a closure to map X to a return value");
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
}
}
}};
($name:literal, $context:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
}
}
}};
($name:literal, $context:expr, $condition:expr, $closure:expr) => {{
if cfg!(feature = "testing") {
if $condition {
match $crate::failpoint($name, Some($context)) {
$crate::either::Either::Left(result) => {
match result {
$crate::FailpointResult::Continue => {},
$crate::FailpointResult::Return(value) => {
let closure = $closure;
return closure(value);
},
$crate::FailpointResult::Cancelled => {},
}
},
$crate::either::Either::Right(_) => {
panic!("failpoint '{}' triggered an async action (sleep/pause) but fail_point_with_context_sync! was used. Use fail_point_with_context! instead.", $name);
},
}
}
}
}};
}
/// Pausable failpoint macro - equivalent to the old pausable_failpoint
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {{
if cfg!(feature = "testing") {
let cancel = ::tokio_util::sync::CancellationToken::new();
let _ = $crate::pausable_failpoint!($name, &cancel);
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
::tracing::info!("at failpoint {}", $name); // tests rely on this
match $crate::failpoint_with_cancellation($name, None, $cancel) {
$crate::either::Either::Left(result) => match result {
$crate::FailpointResult::Continue => Ok(()),
$crate::FailpointResult::Return(_) => Ok(()),
$crate::FailpointResult::Cancelled => Err(()),
},
$crate::either::Either::Right(future) => match future.await {
$crate::FailpointResult::Continue => Ok(()),
$crate::FailpointResult::Return(_) => Ok(()),
$crate::FailpointResult::Cancelled => Err(()),
},
}
} else {
Ok(())
}
}};
}
/// Sleep failpoint macro - for async sleep operations
#[macro_export]
macro_rules! sleep_millis_async {
($name:literal) => {{
if cfg!(feature = "testing") {
match $crate::failpoint($name, None) {
$crate::either::Either::Left(_) => {}
$crate::either::Either::Right(future) => {
future.await;
}
}
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
match $crate::failpoint_with_cancellation($name, None, $cancel) {
$crate::either::Either::Left(_) => {}
$crate::either::Either::Right(future) => {
future.await;
}
}
}
}};
}
// Re-export for convenience
pub use fail_point;
pub use fail_point_sync;
pub use fail_point_with_context;
pub use fail_point_with_context_sync;
pub use pausable_failpoint;
pub use sleep_millis_async;

View File

@@ -9,7 +9,7 @@ default = ["rename_noreplace"]
rename_noreplace = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
testing = ["neon_failpoint/testing"]
[dependencies]
arc-swap.workspace = true
@@ -21,10 +21,11 @@ bytes.workspace = true
camino.workspace = true
chrono.workspace = true
diatomic-waker.workspace = true
either.workspace = true
git-version.workspace = true
hex = { workspace = true, features = ["serde"] }
humantime.workspace = true
fail.workspace = true
neon_failpoint.workspace = true
futures = { workspace = true }
jsonwebtoken.workspace = true
nix = { workspace = true, features = ["ioctl"] }

View File

@@ -1,59 +1,22 @@
//! Failpoint support code shared between pageserver and safekeepers.
//!
//! This module provides a compatibility layer over the new neon_failpoint crate.
pub use neon_failpoint::{configure_failpoint as apply_failpoint, has_failpoints, init};
use tokio_util::sync::CancellationToken;
/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
///
/// Optionally pass a cancellation token, and this failpoint will drop out of
/// its pause when the cancellation token fires. This is useful for testing
/// cases where we would like to block something, but test its clean shutdown behavior.
/// The macro evaluates to a Result in that case, where Ok(()) is the case
/// where the failpoint was not paused, and Err() is the case where cancellation
/// token fired while evaluating the failpoint.
///
/// Remember to unpause the failpoint in the test; until that happens, one of the
/// limited number of spawn_blocking thread pool threads is leaked.
/// Mere forward to neon_failpoint::pausable_failpoint
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {{
if cfg!(feature = "testing") {
let cancel = ::tokio_util::sync::CancellationToken::new();
let _ = $crate::pausable_failpoint!($name, &cancel);
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
let failpoint_fut = ::tokio::task::spawn_blocking({
let current = ::tracing::Span::current();
move || {
let _entered = current.entered();
::tracing::info!("at failpoint {}", $name);
::fail::fail_point!($name);
}
});
let cancel_fut = async move {
$cancel.cancelled().await;
};
::tokio::select! {
res = failpoint_fut => {
res.expect("spawn_blocking");
// continue with execution
Ok(())
},
_ = cancel_fut => {
Err(())
}
}
} else {
Ok(())
}
}};
($name:literal) => {
::neon_failpoint::pausable_failpoint!($name)
};
($name:literal, $cancel:expr) => {
::neon_failpoint::pausable_failpoint!($name, $cancel)
};
}
pub use pausable_failpoint;
/// use with fail::cfg("$name", "return(2000)")
/// DEPRECATED! - use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
/// specified time (in milliseconds). The main difference is that we use async
@@ -69,7 +32,7 @@ macro_rules! __failpoint_sleep_millis_async {
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::fail::fail_point!($name, |x| x);
::neon_failpoint::fail_point_sync!($name, |x| x);
::std::option::Option::None
})();
@@ -82,7 +45,7 @@ macro_rules! __failpoint_sleep_millis_async {
// If the failpoint is used with a "return" action, set should_sleep to the
// returned value (as string). Otherwise it's set to None.
let should_sleep = (|| {
::fail::fail_point!($name, |x| x);
::neon_failpoint::fail_point_sync!($name, |x| x);
::std::option::Option::None
})();
@@ -126,60 +89,3 @@ pub async fn failpoint_sleep_cancellable_helper(
tokio::time::timeout(d, cancel.cancelled()).await.ok();
tracing::info!("failpoint {:?}: sleep done", name);
}
/// Initialize the configured failpoints
///
/// You must call this function before any concurrent threads do operations.
pub fn init() -> fail::FailScenario<'static> {
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
// We want non-default behavior for `exit`, though, so, we handle it separately.
//
// Format for FAILPOINTS is "name=actions" separated by ";".
let actions = std::env::var("FAILPOINTS");
if actions.is_ok() {
// SAFETY: this function should before any threads start and access env vars concurrently
unsafe {
std::env::remove_var("FAILPOINTS");
}
} else {
// let the library handle non-utf8, or nothing for not present
}
let scenario = fail::FailScenario::setup();
if let Ok(val) = actions {
val.split(';')
.enumerate()
.map(|(i, s)| s.split_once('=').ok_or((i, s)))
.for_each(|res| {
let (name, actions) = match res {
Ok(t) => t,
Err((i, s)) => {
panic!(
"startup failpoints: missing action on the {}th failpoint; try `{s}=return`",
i + 1,
);
}
};
if let Err(e) = apply_failpoint(name, actions) {
panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}");
}
});
}
scenario
}
pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> {
if actions == "exit" {
fail::cfg_callback(name, exit_failpoint)
} else {
fail::cfg(name, actions)
}
}
#[inline(never)]
fn exit_failpoint() {
tracing::info!("Exit requested by failpoint");
std::process::exit(1);
}

View File

@@ -8,7 +8,7 @@ license.workspace = true
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
testing = ["neon_failpoint/testing", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
fuzz-read-path = ["testing"]
@@ -33,7 +33,7 @@ crc32c.workspace = true
either.workspace = true
enum-map.workspace = true
enumset = { workspace = true, features = ["serde"]}
fail.workspace = true
neon_failpoint.workspace = true
futures.workspace = true
hashlink.workspace = true
hex.workspace = true

View File

@@ -17,6 +17,7 @@ use anyhow::{Context, anyhow};
use async_compression::tokio::write::GzipEncoder;
use bytes::{BufMut, Bytes, BytesMut};
use fail::fail_point;
use neon_failpoint as fail;
use pageserver_api::key::{Key, rel_block_to_key};
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES};

View File

@@ -68,7 +68,7 @@ const FEATURES: &[&str] = &[
fn version() -> String {
format!(
"{GIT_VERSION} failpoints: {}, features: {:?}",
fail::has_failpoints(),
neon_failpoint::has_failpoints(),
FEATURES,
)
}
@@ -84,7 +84,7 @@ fn main() -> anyhow::Result<()> {
}
// Initialize up failpoints support
let scenario = failpoint_support::init();
failpoint_support::init().unwrap();
let workdir = arg_matches
.get_one::<String>("workdir")
@@ -221,7 +221,6 @@ fn main() -> anyhow::Result<()> {
start_pageserver(launch_ts, conf, ignored, otel_guard).context("Failed to start pageserver")?;
scenario.teardown();
Ok(())
}
@@ -366,16 +365,9 @@ fn start_pageserver(
// If any failpoints were set from FAILPOINTS environment variable,
// print them to the log for debugging purposes
let failpoints = fail::list();
if !failpoints.is_empty() {
info!(
"started with failpoints: {}",
failpoints
.iter()
.map(|(name, actions)| format!("{name}={actions}"))
.collect::<Vec<String>>()
.join(";")
)
let failpoints = neon_failpoint::list();
for (name, actions) in failpoints {
info!("starting with failpoint: {name} {actions}");
}
// Create and lock PID file. This ensures that there cannot be more than one

View File

@@ -6,6 +6,8 @@ use camino::{Utf8Path, Utf8PathBuf};
use super::{NewMetricsRoot, NewRawMetric, RawMetric};
use crate::consumption_metrics::NewMetricsRefRoot;
use neon_failpoint as fail;
pub(super) fn read_metrics_from_serde_value(
json_value: serde_json::Value,
) -> anyhow::Result<Vec<NewRawMetric>> {
@@ -129,7 +131,7 @@ pub(super) async fn flush_metrics_to_disk(
tempfile.flush()?;
tempfile.as_file().sync_all()?;
fail::fail_point!("before-persist-last-metrics-collected");
fail::fail_point_sync!("before-persist-last-metrics-collected");
drop(tempfile.persist(&*path).map_err(|e| e.error)?);

View File

@@ -8,6 +8,7 @@
use std::time::Duration;
use neon_failpoint as fail;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

View File

@@ -28,6 +28,7 @@ use http_utils::{RequestExt, RouterBuilder};
use humantime::format_rfc3339;
use hyper::{Body, Request, Response, StatusCode, Uri, header};
use metrics::launch_timestamp::LaunchTimestamp;
use neon_failpoint as fail;
use pageserver_api::models::virtual_file::IoMode;
use pageserver_api::models::{
DetachBehavior, DownloadRemoteLayersTaskSpawnRequest, IngestAuxFilesRequest,
@@ -3975,7 +3976,7 @@ pub fn make_router(
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
.get("/v1/status", |r| api_handler(r, status_handler))
.put("/v1/failpoints", |r| {
.post("/v1/failpoints", |r| {
testing_api_handler("manage failpoints", r, failpoints_handler)
})
.post("/v1/reload_auth_validation_keys", |r| {

View File

@@ -19,6 +19,7 @@ use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use itertools::Itertools;
use jsonwebtoken::TokenData;
use neon_failpoint as fail;
use once_cell::sync::OnceCell;
use pageserver_api::config::{
GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined,
@@ -335,18 +336,21 @@ async fn page_service_conn_main(
let default_timeout_ms = 10 * 60 * 1000; // 10 minutes by default
let socket_timeout_ms = (|| {
fail::fail_point!("simulated-bad-compute-connection", |avg_timeout_ms| {
// Exponential distribution for simulating
// poor network conditions, expect about avg_timeout_ms to be around 15
// in tests
if let Some(avg_timeout_ms) = avg_timeout_ms {
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
let u = rand::random::<f32>();
((1.0 - u).ln() / (-avg)) as u64
} else {
default_timeout_ms
fail::fail_point_sync!(
"simulated-bad-compute-connection",
|avg_timeout_ms: Option<String>| {
// Exponential distribution for simulating
// poor network conditions, expect about avg_timeout_ms to be around 15
// in tests
if let Some(avg_timeout_ms) = avg_timeout_ms {
let avg = avg_timeout_ms.parse::<i64>().unwrap() as f32;
let u = rand::random::<f32>();
((1.0 - u).ln() / (-avg)) as u64
} else {
default_timeout_ms
}
}
});
);
default_timeout_ms
})();
@@ -3043,7 +3047,7 @@ where
_pgb: &mut PostgresBackend<IO>,
sm: &FeStartupPacket,
) -> Result<(), QueryError> {
fail::fail_point!("ps::connection-start::startup-packet");
fail::fail_point_sync!("ps::connection-start::startup-packet");
if let FeStartupPacket::StartupMessage { params, .. } = sm {
if let Some(app_name) = params.get("application_name") {

View File

@@ -14,6 +14,7 @@ use crate::{PERF_TRACE_TARGET, ensure_walingest};
use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use neon_failpoint as fail;
use pageserver_api::key::{
AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists,
TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range,

View File

@@ -30,6 +30,7 @@ use enumset::EnumSet;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use itertools::Itertools as _;
use neon_failpoint as fail;
use once_cell::sync::Lazy;
pub use pageserver_api::models::TenantState;
use pageserver_api::models::{self, RelSizeMigration};
@@ -9571,7 +9572,7 @@ mod tests {
writer.finish_write(Lsn(0x30));
drop(writer);
fail::cfg(
neon_failpoint::configure_failpoint(
"flush-layer-before-update-remote-consistent-lsn",
"return()",
)

View File

@@ -12,6 +12,7 @@ use anyhow::Context;
use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use neon_failpoint as fail;
use pageserver_api::key::Key;
use pageserver_api::models::{DetachBehavior, LocationConfigMode};
use pageserver_api::shard::{

View File

@@ -194,6 +194,7 @@ pub(crate) use download::{
};
use index::GcCompactionState;
pub(crate) use index::LayerFileMetadata;
use neon_failpoint as fail;
use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState};
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;

View File

@@ -11,6 +11,7 @@ use std::time::SystemTime;
use anyhow::{Context, anyhow};
use camino::{Utf8Path, Utf8PathBuf};
use neon_failpoint as fail;
use pageserver_api::shard::TenantShardId;
use remote_storage::{
DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath,

View File

@@ -8,6 +8,7 @@ use anyhow::{Context, bail};
use bytes::Bytes;
use camino::Utf8Path;
use fail::fail_point;
use neon_failpoint as fail;
use pageserver_api::shard::TenantShardId;
use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError};
use tokio::fs::{self, File};

View File

@@ -39,6 +39,7 @@ use layer_manager::{
LayerManagerLockHolder, LayerManagerReadGuard, LayerManagerWriteGuard, LockedLayerManager,
Shutdown,
};
use neon_failpoint as fail;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
@@ -5183,7 +5184,9 @@ impl Timeline {
*self.applied_gc_cutoff_lsn.read(),
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
neon_failpoint::fail_point_sync!("checkpoint-before-saving-metadata", |x: Option<
String,
>| bail!(
"{}",
x.unwrap()
));

View File

@@ -26,6 +26,7 @@ use enumset::EnumSet;
use fail::fail_point;
use futures::FutureExt;
use itertools::Itertools;
use neon_failpoint as fail;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE;
use pageserver_api::key::{KEY_SIZE, Key};

View File

@@ -2,6 +2,7 @@ use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use anyhow::Context;
use neon_failpoint as fail;
use pageserver_api::models::TimelineState;
use pageserver_api::shard::TenantShardId;
use remote_storage::DownloadError;

View File

@@ -4,6 +4,7 @@ use std::sync::Arc;
use anyhow::Context;
use bytes::Bytes;
use http_utils::error::ApiError;
use neon_failpoint as fail;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::DetachBehavior;
@@ -1113,7 +1114,7 @@ pub(super) async fn detach_and_reparent(
// others will fail as if those timelines had been stopped for whatever reason.
#[cfg(feature = "testing")]
let failpoint_sem = || -> Option<Arc<Semaphore>> {
fail::fail_point!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
fail::fail_point_sync!("timeline-detach-ancestor::allow_one_reparented", |_| Some(
Arc::new(Semaphore::new(1))
));
None

View File

@@ -5,6 +5,7 @@ use std::sync::Arc;
use anyhow::Context;
use camino::Utf8PathBuf;
use neon_failpoint as fail;
use tracing::{error, info, info_span};
use utils::fs_ext;
use utils::id::TimelineId;

View File

@@ -11,6 +11,7 @@ use bytes::BytesMut;
use chrono::{NaiveDateTime, Utc};
use fail::fail_point;
use futures::StreamExt;
use neon_failpoint as fail;
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::WAL_SEGMENT_SIZE;

View File

@@ -6,9 +6,7 @@ license.workspace = true
[features]
default = []
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints"]
testing = ["neon_failpoint/testing"]
benchmarking = []
[dependencies]
@@ -21,7 +19,7 @@ camino-tempfile.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
crc32c.workspace = true
fail.workspace = true
neon_failpoint.workspace = true
hex.workspace = true
humantime.workspace = true
http.workspace = true

View File

@@ -65,7 +65,7 @@ const FEATURES: &[&str] = &[
fn version() -> String {
format!(
"{GIT_VERSION} failpoints: {}, features: {:?}",
fail::has_failpoints(),
neon_failpoint::has_failpoints(),
FEATURES,
)
}

View File

@@ -717,7 +717,7 @@ pub fn make_router(
.get("/profile/cpu", |r| request_span(r, profile_cpu_handler))
.get("/profile/heap", |r| request_span(r, profile_heap_handler))
.get("/v1/status", |r| request_span(r, status_handler))
.put("/v1/failpoints", |r| {
.post("/v1/failpoints", |r| {
request_span(r, move |r| async {
check_permission(&r, None)?;
let cancel = CancellationToken::new();

View File

@@ -872,14 +872,15 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
async fn wait_wal(&mut self) -> Result<(), CopyStreamHandlerEnd> {
loop {
self.end_pos = self.end_watch.get();
let have_something_to_send = (|| {
fail::fail_point!(
let have_something_to_send = async {
neon_failpoint::fail_point!(
"sk-pause-send",
self.appname.as_deref() != Some("pageserver"),
|_| { false }
);
self.end_pos > self.start_pos
})();
}
.await;
if have_something_to_send {
trace!("got end_pos {:?}, streaming", self.end_pos);
@@ -931,14 +932,15 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> WalSender<'_, IO> {
/// - Err in case of error -- only if 1) term changed while fetching in recovery
/// mode 2) watch channel closed, which must never happen.
async fn wait_for_lsn(&mut self) -> anyhow::Result<Option<Lsn>> {
let fp = (|| {
fail::fail_point!(
let fp = async {
neon_failpoint::fail_point!(
"sk-pause-send",
self.appname.as_deref() != Some("pageserver"),
|_| { true }
);
false
})();
}
.await;
if fp {
tokio::time::sleep(POLL_STATE_TIMEOUT).await;
return Ok(None);

View File

@@ -657,7 +657,7 @@ pub async fn delete_timeline(
pausable_failpoint!("sk-delete-timeline-remote-pause");
fail::fail_point!("sk-delete-timeline-remote", |_| {
neon_failpoint::fail_point!("sk-delete-timeline-remote", |_| {
Err(anyhow::anyhow!("failpoint: sk-delete-timeline-remote"))
});

View File

@@ -301,7 +301,7 @@ impl PhysicalStorage {
format!("Failed to open tmp wal file {:?}", &tmp_path)
})?;
fail::fail_point!("sk-zero-segment", |_| {
neon_failpoint::fail_point!("sk-zero-segment", |_| {
info!("sk-zero-segment failpoint hit");
Err(anyhow::anyhow!("failpoint: sk-zero-segment"))
});

View File

@@ -22,7 +22,7 @@ clap.workspace = true
clashmap.workspace = true
compute_api.workspace = true
cron.workspace = true
fail.workspace = true
neon_failpoint.workspace = true
futures.workspace = true
governor.workspace = true
hex.workspace = true

View File

@@ -25,6 +25,7 @@ use futures::stream::FuturesUnordered;
use http_utils::error::ApiError;
use hyper::Uri;
use itertools::Itertools;
use neon_failpoint as fail;
use pageserver_api::config::PostHogConfig;
use pageserver_api::controller_api::{
AvailabilityZone, MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability,
@@ -6026,7 +6027,7 @@ impl Service {
tenant_id: TenantId,
split_req: TenantShardSplitRequest,
) -> Result<ShardSplitAction, ApiError> {
fail::fail_point!("shard-split-validation", |_| Err(ApiError::BadRequest(
fail::fail_point_sync!("shard-split-validation", |_| Err(ApiError::BadRequest(
anyhow::anyhow!("failpoint")
)));

View File

@@ -159,16 +159,59 @@ class EndpointHttpClient(requests.Session):
res.raise_for_status()
return res.json()
def configure_failpoints(self, *args: tuple[str, str]) -> None:
body: list[dict[str, str]] = []
def configure_failpoints(
self, *args: tuple[str, str] | list[dict[str, str | dict[str, str]]]
) -> None:
"""Configure failpoints for testing purposes.
for fp in args:
body.append(
{
"name": fp[0],
"action": fp[1],
Args:
*args: Can be one of:
- Variable number of (name, actions) tuples
- Single list of dicts with keys: name, actions, and optionally context_matchers
Examples:
# Basic failpoints
client.configure_failpoints(("test_fp", "return(error)"))
client.configure_failpoints(("fp1", "return"), ("fp2", "sleep(1000)"))
# Probability-based failpoint
client.configure_failpoints(("test_fp", "50%return(error)"))
# Context-based failpoint
client.configure_failpoints([{
"name": "test_fp",
"actions": "return(error)",
"context_matchers": {"tenant_id": ".*test.*"}
}])
"""
request_body: list[dict[str, Any]] = []
if (
len(args) == 1
and isinstance(args[0], list)
and args[0]
and isinstance(args[0][0], dict)
):
# Handle list of dicts (context-based failpoints)
failpoint_configs = args[0]
for config in failpoint_configs:
server_config: dict[str, Any] = {
"name": config["name"],
"actions": config["actions"],
}
)
if "context_matchers" in config:
server_config["context_matchers"] = config["context_matchers"]
request_body.append(server_config)
else:
# Handle tuples (basic failpoints)
for fp in args:
request_body.append(
{
"name": fp[0],
"actions": fp[1],
}
)
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=body)
res = self.post(f"http://localhost:{self.internal_port}/failpoints", json=request_body)
res.raise_for_status()

View File

@@ -2614,22 +2614,68 @@ class NeonStorageController(MetricsGetter, LogUtils):
)
return res.json()
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
def configure_failpoints(
self,
config_strings: tuple[str, str]
| list[tuple[str, str]]
| list[dict[str, str | dict[str, str]]],
):
"""
Configure failpoints for testing purposes.
log.info(f"Requesting config failpoints: {repr(pairs)}")
Args:
config_strings: Can be one of:
- Single tuple of (name, actions)
- List of tuples [(name, actions), ...]
- List of dicts with keys: name, actions, and optionally context_matchers
Examples:
# Basic failpoint
client.configure_failpoints(("test_fp", "return(error)"))
# Multiple basic failpoints
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
# Probability-based failpoint
client.configure_failpoints(("test_fp", "50%return(error)"))
# Context-based failpoint
client.configure_failpoints([{
"name": "test_fp",
"actions": "return(error)",
"context_matchers": {"tenant_id": ".*test.*"}
}])
"""
# Handle single tuple case
if isinstance(config_strings, tuple):
config_strings = [config_strings]
# Convert to server format
body: list[dict[str, str | dict[str, str]]] = []
for config in config_strings:
if isinstance(config, tuple):
# Simple (name, actions) tuple
body.append({"name": config[0], "actions": config[1]})
elif isinstance(config, dict):
# Dict with name, actions, and optional context_matchers
server_config: dict[str, str | dict[str, str]] = {
"name": config["name"],
"actions": config["actions"],
}
if "context_matchers" in config:
server_config["context_matchers"] = config["context_matchers"]
body.append(server_config)
else:
raise ValueError(f"Invalid config format: {config}")
res = self.request(
"PUT",
f"{self.api}/debug/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
f"{self.api_root()}/debug/v1/failpoints",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
if res.status_code != 200:
self.raise_api_exception(res)
def get_tenants_placement(self) -> defaultdict[str, dict[str, Any]]:
"""

View File

@@ -309,25 +309,64 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
def check_status(self):
self.get(f"http://localhost:{self.port}/v1/status").raise_for_status()
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
def configure_failpoints(
self,
config_strings: tuple[str, str]
| list[tuple[str, str]]
| list[dict[str, str | dict[str, str]]],
):
"""
Configure failpoints for testing purposes.
Args:
config_strings: Can be one of:
- Single tuple of (name, actions)
- List of tuples [(name, actions), ...]
- List of dicts with keys: name, actions, and optionally context_matchers
Examples:
# Basic failpoint
client.configure_failpoints(("test_fp", "return(error)"))
# Multiple basic failpoints
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
# Probability-based failpoint
client.configure_failpoints(("test_fp", "50%return(error)"))
# Context-based failpoint
client.configure_failpoints([{
"name": "test_fp",
"actions": "return(error)",
"context_matchers": {"tenant_id": ".*test.*"}
}])
"""
self.is_testing_enabled_or_skip()
# Handle single tuple case
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
config_strings = [config_strings]
log.info(f"Requesting config failpoints: {repr(pairs)}")
# Convert to server format
body: list[dict[str, str | dict[str, str]]] = []
for config in config_strings:
if isinstance(config, tuple):
# Simple (name, actions) tuple
body.append({"name": config[0], "actions": config[1]})
elif isinstance(config, dict):
# Dict with name, actions, and optional context_matchers
server_config = {"name": config["name"], "actions": config["actions"]}
if "context_matchers" in config:
server_config["context_matchers"] = config["context_matchers"]
body.append(server_config)
else:
raise ValueError(f"Invalid config format: {config}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
self.verbose_error(res)
res_json = res.json()
assert res_json is None
return res_json
res = self.post(f"{self.base_url}/v1/failpoints", json=body)
if res.status_code != 200:
raise PageserverApiException(
f"Failed to configure failpoints: {res.text}", res.status_code
)
def reload_auth_validation_keys(self):
res = self.post(f"http://localhost:{self.port}/v1/reload_auth_validation_keys")

View File

@@ -8,7 +8,6 @@ import pytest
import requests
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.utils import EnhancedJSONEncoder, wait_until
@@ -155,25 +154,62 @@ class SafekeeperHttpClient(requests.Session, MetricsGetter):
if not self.is_testing_enabled:
pytest.skip("safekeeper was built without 'testing' feature")
def configure_failpoints(self, config_strings: tuple[str, str] | list[tuple[str, str]]):
def configure_failpoints(
self,
config_strings: tuple[str, str]
| list[tuple[str, str]]
| list[dict[str, str | dict[str, str]]],
):
"""
Configure failpoints for testing purposes.
Args:
config_strings: Can be one of:
- Single tuple of (name, actions)
- List of tuples [(name, actions), ...]
- List of dicts with keys: name, actions, and optionally context_matchers
Examples:
# Basic failpoint
client.configure_failpoints(("test_fp", "return(error)"))
# Multiple basic failpoints
client.configure_failpoints([("fp1", "return"), ("fp2", "sleep(1000)")])
# Probability-based failpoint
client.configure_failpoints(("test_fp", "50%return(error)"))
# Context-based failpoint
client.configure_failpoints([{
"name": "test_fp",
"actions": "return(error)",
"context_matchers": {"tenant_id": ".*test.*"}
}])
"""
self.is_testing_enabled_or_skip()
# Handle single tuple case
if isinstance(config_strings, tuple):
pairs = [config_strings]
else:
pairs = config_strings
config_strings = [config_strings]
log.info(f"Requesting config failpoints: {repr(pairs)}")
# Convert to server format
body: list[dict[str, str | dict[str, str]]] = []
for config in config_strings:
if isinstance(config, tuple):
# Simple (name, actions) tuple
body.append({"name": config[0], "actions": config[1]})
elif isinstance(config, dict):
# Dict with name, actions, and optional context_matchers
server_config = {"name": config["name"], "actions": config["actions"]}
if "context_matchers" in config:
server_config["context_matchers"] = config["context_matchers"]
body.append(server_config)
else:
raise ValueError(f"Invalid config format: {config}")
res = self.put(
f"http://localhost:{self.port}/v1/failpoints",
json=[{"name": name, "actions": actions} for name, actions in pairs],
)
log.info(f"Got failpoints request response code {res.status_code}")
res.raise_for_status()
res_json = res.json()
assert res_json is None
return res_json
res = self.post(f"http://localhost:{self.port}/v1/failpoints", json=body)
if res.status_code != 200:
raise RuntimeError(f"Failed to configure failpoints: {res.text}")
def tenant_delete_force(self, tenant_id: TenantId) -> dict[Any, Any]:
res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}")