From f29b9737cc056f5220873ebea85d97fec965a84b Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Jul 2025 11:59:01 +0200 Subject: [PATCH] implement failpoint lib replacement --- Cargo.lock | 19 +- Cargo.toml | 2 + libs/http-utils/Cargo.toml | 2 +- libs/http-utils/src/failpoints.rs | 16 +- libs/neon_failpoint/Cargo.toml | 25 ++ libs/neon_failpoint/README.md | 158 ++++++++++ libs/neon_failpoint/examples/context_demo.rs | 57 ++++ libs/neon_failpoint/src/lib.rs | 311 +++++++++++++++++++ libs/neon_failpoint/src/macros.rs | 155 +++++++++ libs/utils/Cargo.toml | 4 +- libs/utils/src/failpoint_support.rs | 150 ++------- 11 files changed, 758 insertions(+), 141 deletions(-) create mode 100644 libs/neon_failpoint/Cargo.toml create mode 100644 libs/neon_failpoint/README.md create mode 100644 libs/neon_failpoint/examples/context_demo.rs create mode 100644 libs/neon_failpoint/src/lib.rs create mode 100644 libs/neon_failpoint/src/macros.rs diff --git a/Cargo.lock b/Cargo.lock index c528354053..7a04478968 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2891,13 +2891,13 @@ dependencies = [ "arc-swap", "bytes", "camino", - "fail", "futures", "hyper 0.14.30", "itertools 0.10.5", "jemalloc_pprof", "jsonwebtoken", "metrics", + "neon_failpoint", "once_cell", "pprof", "regex", @@ -3852,6 +3852,21 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "neon_failpoint" +version = "0.1.0" +dependencies = [ + "anyhow", + "once_cell", + "parking_lot 0.12.1", + "regex", + "serde", + "tokio", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "never-say-never" version = "6.6.666" @@ -8164,7 +8179,6 @@ dependencies = [ "const_format", "criterion", "diatomic-waker", - "fail", "futures", "git-version", "hex", @@ -8172,6 +8186,7 @@ dependencies = [ "humantime", "jsonwebtoken", "metrics", + "neon_failpoint", "nix 0.30.1", "once_cell", "pem", diff --git a/Cargo.toml b/Cargo.toml index 0d521ee4d9..a26789389d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ members = [ "workspace_hack", "libs/compute_api", "libs/http-utils", + "libs/neon_failpoint", "libs/pageserver_api", "libs/postgres_ffi", "libs/postgres_ffi_types", @@ -258,6 +259,7 @@ desim = { version = "0.1", path = "./libs/desim" } endpoint_storage = { version = "0.0.1", path = "./endpoint_storage/" } http-utils = { version = "0.1", path = "./libs/http-utils/" } metrics = { version = "0.1", path = "./libs/metrics/" } +neon_failpoint = { version = "0.1", path = "./libs/neon_failpoint/" } neon-shmem = { version = "0.1", path = "./libs/neon-shmem/" } pageserver = { path = "./pageserver" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } diff --git a/libs/http-utils/Cargo.toml b/libs/http-utils/Cargo.toml index ab9380089b..dfbe226b4e 100644 --- a/libs/http-utils/Cargo.toml +++ b/libs/http-utils/Cargo.toml @@ -9,7 +9,7 @@ anyhow.workspace = true arc-swap.workspace = true bytes.workspace = true camino.workspace = true -fail.workspace = true +neon_failpoint.workspace = true futures.workspace = true hyper0.workspace = true itertools.workspace = true diff --git a/libs/http-utils/src/failpoints.rs b/libs/http-utils/src/failpoints.rs index 984823f4a9..c3dac1e033 100644 --- a/libs/http-utils/src/failpoints.rs +++ b/libs/http-utils/src/failpoints.rs @@ -1,7 +1,7 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; -use utils::failpoint_support::apply_failpoint; +use neon_failpoint::{configure_failpoint, has_failpoints}; use crate::error::ApiError; use crate::json::{json_request, json_response}; @@ -13,9 +13,9 @@ pub type ConfigureFailpointsRequest = Vec; pub struct FailpointConfig { /// Name of the fail point pub name: String, - /// List of actions to take, using the format described in `fail::cfg` + /// List of actions to take, using the format described in neon_failpoint /// - /// We also support `actions = "exit"` to cause the fail point to immediately exit. + /// We support actions: "pause", "sleep(N)", "return", "return(value)", "exit", "off" pub actions: String, } @@ -24,7 +24,7 @@ pub async fn failpoints_handler( mut request: Request, _cancel: CancellationToken, ) -> Result, ApiError> { - if !fail::has_failpoints() { + if !has_failpoints() { return Err(ApiError::BadRequest(anyhow::anyhow!( "Cannot manage failpoints because neon was compiled without failpoints support" ))); @@ -34,13 +34,11 @@ pub async fn failpoints_handler( for fp in failpoints { tracing::info!("cfg failpoint: {} {}", fp.name, fp.actions); - // We recognize one extra "action" that's not natively recognized - // by the failpoints crate: exit, to immediately kill the process - let cfg_result = apply_failpoint(&fp.name, &fp.actions); + let cfg_result = configure_failpoint(&fp.name, &fp.actions); - if let Err(err_msg) = cfg_result { + if let Err(err) = cfg_result { return Err(ApiError::BadRequest(anyhow::anyhow!( - "Failed to configure failpoints: {err_msg}" + "Failed to configure failpoints: {err}" ))); } } diff --git a/libs/neon_failpoint/Cargo.toml b/libs/neon_failpoint/Cargo.toml new file mode 100644 index 0000000000..f90e5d5d4b --- /dev/null +++ b/libs/neon_failpoint/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "neon_failpoint" +version = "0.1.0" +edition = "2021" + +[dependencies] +tokio.workspace = true +tokio-util = { workspace = true } +tracing = { workspace = true } +serde = { workspace = true, features = ["derive"] } +anyhow = { workspace = true } +regex = { workspace = true } +once_cell = { workspace = true } +parking_lot = { workspace = true } + +[dev-dependencies] +tracing-subscriber = { workspace = true, features = ["fmt"] } + +[features] +default = [] +testing = [] + +[[example]] +name = "context_demo" +required-features = ["testing"] \ No newline at end of file diff --git a/libs/neon_failpoint/README.md b/libs/neon_failpoint/README.md new file mode 100644 index 0000000000..49a15c46ba --- /dev/null +++ b/libs/neon_failpoint/README.md @@ -0,0 +1,158 @@ +# Neon Failpoint Library + +A modern, async-first failpoint library for Neon, replacing the `fail` crate with enhanced functionality. + +## Features + +- **Async-first**: All failpoint operations are async and don't require `spawn_blocking` +- **Context matching**: Failpoints can be configured to trigger only when specific context conditions are met +- **Regex support**: Context values can be matched using regular expressions +- **Cancellation support**: All operations support cancellation tokens +- **Backward compatibility**: Drop-in replacement for existing `fail` crate usage + +## Supported Actions + +- `off` - Disable the failpoint +- `pause` - Pause indefinitely until disabled or cancelled +- `sleep(N)` - Sleep for N milliseconds +- `return` - Return early (empty value) +- `return(value)` - Return early with a specific value +- `exit` - Exit the process immediately + +## Basic Usage + +```rust +use neon_failpoint::{configure_failpoint, failpoint, FailpointResult}; + +// Configure a failpoint +configure_failpoint("my_failpoint", "return(42)").unwrap(); + +// Use the failpoint +match failpoint("my_failpoint", None).await { + FailpointResult::Return(value) => { + println!("Failpoint returned: {}", value); + return value.parse().unwrap_or_default(); + } + FailpointResult::Continue => { + // Continue normal execution + } + FailpointResult::Cancelled => { + // Handle cancellation + } +} +``` + +## Context-Specific Failpoints + +```rust +use neon_failpoint::{configure_failpoint_with_context, failpoint, failpoint_context}; +use std::collections::HashMap; + +// Configure a failpoint that only triggers for specific tenants +let mut context_matchers = HashMap::new(); +context_matchers.insert("tenant_id".to_string(), "test_.*".to_string()); +context_matchers.insert("operation".to_string(), "backup".to_string()); + +configure_failpoint_with_context( + "backup_operation", + "return(simulated_failure)", + context_matchers +).unwrap(); + +// Use with context +let context = failpoint_context! { + "tenant_id" => "test_123", + "operation" => "backup", +}; + +match failpoint("backup_operation", Some(&context)).await { + FailpointResult::Return(value) => { + // This will trigger for tenant_id matching "test_.*" + println!("Backup failed: {}", value); + } + FailpointResult::Continue => { + // Continue with backup + } + FailpointResult::Cancelled => {} +} +``` + +## Macros + +The library provides convenient macros for common patterns: + +```rust +use neon_failpoint::{fail_point, pausable_failpoint, sleep_millis_async}; + +// Simple failpoint (equivalent to fail::fail_point!) +fail_point!("my_failpoint"); + +// Failpoint with return value handling +fail_point!("my_failpoint", |value| { + println!("Got value: {}", value); + return Ok(value.parse().unwrap_or_default()); +}); + +// Pausable failpoint with cancellation +let cancel_token = CancellationToken::new(); +if let Err(()) = pausable_failpoint!("pause_here", &cancel_token).await { + println!("Failpoint was cancelled"); +} + +// Sleep failpoint +sleep_millis_async!("sleep_here", &cancel_token).await; +``` + +## Migration from `fail` crate + +The library provides a compatibility layer in `libs/utils/src/failpoint_support.rs`. Most existing code should work without changes, but you can migrate to the new async APIs for better performance: + +### Before (with `fail` crate): +```rust +use utils::failpoint_support::pausable_failpoint; + +// This used spawn_blocking internally +pausable_failpoint!("my_failpoint", &cancel_token).await?; +``` + +### After (with `neon_failpoint`): +```rust +use neon_failpoint::{failpoint_with_cancellation, FailpointResult}; + +// This is fully async +match failpoint_with_cancellation("my_failpoint", None, &cancel_token).await { + FailpointResult::Continue => {}, + FailpointResult::Cancelled => return Err(()), + FailpointResult::Return(_) => {}, +} +``` + +## Environment Variable Support + +Failpoints can be configured via the `FAILPOINTS` environment variable: + +```bash +FAILPOINTS="failpoint1=return(42);failpoint2=sleep(1000);failpoint3=exit" +``` + +## Testing + +The library includes comprehensive tests and examples. Run them with: + +```bash +cargo test --features testing +cargo run --example context_demo --features testing +``` + +## HTTP Configuration + +The library integrates with the existing HTTP failpoint configuration API. Send POST requests to `/v1/failpoints` with: + +```json +[ + { + "name": "my_failpoint", + "actions": "return(42)" + } +] +``` \ No newline at end of file diff --git a/libs/neon_failpoint/examples/context_demo.rs b/libs/neon_failpoint/examples/context_demo.rs new file mode 100644 index 0000000000..28876cb1d3 --- /dev/null +++ b/libs/neon_failpoint/examples/context_demo.rs @@ -0,0 +1,57 @@ +use neon_failpoint::{configure_failpoint_with_context, failpoint, failpoint_context, FailpointResult}; +use std::collections::HashMap; + +#[tokio::main] +async fn main() { + // Initialize tracing for better output + tracing_subscriber::fmt::init(); + + // Set up a context-specific failpoint + let mut context_matchers = HashMap::new(); + context_matchers.insert("tenant_id".to_string(), "test_.*".to_string()); + context_matchers.insert("operation".to_string(), "backup".to_string()); + + configure_failpoint_with_context( + "backup_operation", + "return(simulated_failure)", + context_matchers + ).unwrap(); + + // Test with matching context + let context = failpoint_context! { + "tenant_id" => "test_123", + "operation" => "backup", + }; + + println!("Testing with matching context..."); + match failpoint("backup_operation", Some(&context)).await { + FailpointResult::Return(value) => { + println!("Failpoint triggered with value: {}", value); + } + FailpointResult::Continue => { + println!("Failpoint not triggered"); + } + FailpointResult::Cancelled => { + println!("Failpoint cancelled"); + } + } + + // Test with non-matching context + let context = failpoint_context! { + "tenant_id" => "prod_456", + "operation" => "backup", + }; + + println!("Testing with non-matching context..."); + match failpoint("backup_operation", Some(&context)).await { + FailpointResult::Return(value) => { + println!("Failpoint triggered with value: {}", value); + } + FailpointResult::Continue => { + println!("Failpoint not triggered (expected)"); + } + FailpointResult::Cancelled => { + println!("Failpoint cancelled"); + } + } +} \ No newline at end of file diff --git a/libs/neon_failpoint/src/lib.rs b/libs/neon_failpoint/src/lib.rs new file mode 100644 index 0000000000..6f7c7a5fb9 --- /dev/null +++ b/libs/neon_failpoint/src/lib.rs @@ -0,0 +1,311 @@ +//! Neon failpoint library - a replacement for the `fail` crate with async support and context matching. +//! +//! This library provides failpoint functionality for testing with the following features: +//! - Async variants that don't require spawn_blocking +//! - Context-specific failpoints with regex matching +//! - Support for all actions used in the codebase: pause, sleep, return, exit, off +//! - Compatible API with the existing fail crate usage + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use once_cell::sync::Lazy; +use parking_lot::RwLock; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use tokio_util::sync::CancellationToken; + +pub mod macros; + +/// Global failpoint registry +static FAILPOINTS: Lazy>>> = + Lazy::new(|| Arc::new(RwLock::new(HashMap::new()))); + +/// Configuration for a single failpoint +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FailpointConfig { + /// The action to take when the failpoint is hit + pub action: FailpointAction, + /// Optional context matching rules + pub context_matchers: Option>, +} + +/// Actions that can be taken when a failpoint is hit +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum FailpointAction { + /// Do nothing - effectively disables the failpoint + Off, + /// Pause indefinitely until the failpoint is disabled + Pause, + /// Sleep for a specified duration in milliseconds + Sleep(u64), + /// Return a value (for failpoints that support it) + Return(String), + /// Exit the process immediately + Exit, +} + +/// Context information passed to failpoints +pub type FailpointContext = HashMap; + +/// Result of hitting a failpoint +#[derive(Debug)] +pub enum FailpointResult { + /// Continue normal execution + Continue, + /// Return early with a value + Return(String), + /// Cancelled by cancellation token + Cancelled, +} + +/// Initialize failpoints from environment variable +pub fn init() -> Result<()> { + if let Ok(env_failpoints) = std::env::var("FAILPOINTS") { + for entry in env_failpoints.split(';') { + if let Some((name, actions)) = entry.split_once('=') { + configure_failpoint(name, actions)?; + } + } + } + Ok(()) +} + +/// Configure a failpoint with the given action string +pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> { + let action = parse_action(actions)?; + let config = FailpointConfig { + action, + context_matchers: None, + }; + + let mut failpoints = FAILPOINTS.write(); + failpoints.insert(name.to_string(), config); + + tracing::info!("Configured failpoint: {} = {}", name, actions); + Ok(()) +} + +/// Configure a failpoint with context matching +pub fn configure_failpoint_with_context( + name: &str, + actions: &str, + context_matchers: HashMap, +) -> Result<()> { + let action = parse_action(actions)?; + let config = FailpointConfig { + action, + context_matchers: Some(context_matchers), + }; + + let mut failpoints = FAILPOINTS.write(); + failpoints.insert(name.to_string(), config); + + tracing::info!("Configured failpoint with context: {} = {}", name, actions); + Ok(()) +} + +/// Remove a failpoint configuration +pub fn remove_failpoint(name: &str) { + let mut failpoints = FAILPOINTS.write(); + failpoints.remove(name); + tracing::info!("Removed failpoint: {}", name); +} + +/// Check if failpoints are enabled (for compatibility with fail crate) +pub fn has_failpoints() -> bool { + cfg!(feature = "testing") || std::env::var("FAILPOINTS").is_ok() +} + +/// Execute a failpoint with optional context +pub async fn failpoint(name: &str, context: Option<&FailpointContext>) -> FailpointResult { + failpoint_with_cancellation(name, context, &CancellationToken::new()).await +} + +/// Execute a failpoint with cancellation support +pub async fn failpoint_with_cancellation( + name: &str, + context: Option<&FailpointContext>, + cancel_token: &CancellationToken, +) -> FailpointResult { + // Only check failpoints if testing feature is enabled + if !cfg!(feature = "testing") { + return FailpointResult::Continue; + } + + let config = { + let failpoints = FAILPOINTS.read(); + failpoints.get(name).cloned() + }; + + let Some(config) = config else { + return FailpointResult::Continue; + }; + + // Check context matchers if provided + if let (Some(matchers), Some(ctx)) = (&config.context_matchers, context) { + if !matches_context(matchers, ctx) { + return FailpointResult::Continue; + } + } + + tracing::info!("Hit failpoint: {}", name); + + match config.action { + FailpointAction::Off => FailpointResult::Continue, + FailpointAction::Pause => { + tracing::info!("Failpoint {} pausing", name); + cancel_token.cancelled().await; + FailpointResult::Cancelled + } + FailpointAction::Sleep(millis) => { + let duration = Duration::from_millis(millis); + tracing::info!("Failpoint {} sleeping for {:?}", name, duration); + + tokio::select! { + _ = tokio::time::sleep(duration) => { + tracing::info!("Failpoint {} sleep completed", name); + FailpointResult::Continue + } + _ = cancel_token.cancelled() => { + tracing::info!("Failpoint {} sleep cancelled", name); + FailpointResult::Cancelled + } + } + } + FailpointAction::Return(value) => { + tracing::info!("Failpoint {} returning: {}", name, value); + FailpointResult::Return(value) + } + FailpointAction::Exit => { + tracing::info!("Failpoint {} exiting process", name); + std::process::exit(1); + } + } +} + +/// Parse an action string into a FailpointAction +fn parse_action(actions: &str) -> Result { + match actions { + "off" => Ok(FailpointAction::Off), + "pause" => Ok(FailpointAction::Pause), + "exit" => Ok(FailpointAction::Exit), + "return" => Ok(FailpointAction::Return(String::new())), + _ => { + // Try to parse return(value) format + if let Some(captures) = regex::Regex::new(r"^return\(([^)]*)\)$")?.captures(actions) { + let value = captures.get(1).unwrap().as_str().to_string(); + Ok(FailpointAction::Return(value)) + } + // Try to parse sleep(millis) format + else if let Some(captures) = regex::Regex::new(r"^sleep\((\d+)\)$")?.captures(actions) { + let millis = captures.get(1).unwrap().as_str().parse::()?; + Ok(FailpointAction::Sleep(millis)) + } + // For backward compatibility, treat numeric values as sleep duration + else if let Ok(millis) = actions.parse::() { + Ok(FailpointAction::Sleep(millis)) + } + else { + anyhow::bail!("Invalid failpoint action: {}", actions); + } + } + } +} + +/// Check if the given context matches the matchers +fn matches_context(matchers: &HashMap, context: &FailpointContext) -> bool { + for (key, pattern) in matchers { + let Some(value) = context.get(key) else { + return false; + }; + + // Try to compile and match as regex + if let Ok(regex) = Regex::new(pattern) { + if !regex.is_match(value) { + return false; + } + } else { + // Fall back to exact string match + if value != pattern { + return false; + } + } + } + true +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_failpoint_off() { + configure_failpoint("test_off", "off").unwrap(); + let result = failpoint("test_off", None).await; + matches!(result, FailpointResult::Continue); + } + + #[tokio::test] + async fn test_failpoint_return() { + configure_failpoint("test_return", "return(42)").unwrap(); + let result = failpoint("test_return", None).await; + if let FailpointResult::Return(value) = result { + assert_eq!(value, "42"); + } else { + panic!("Expected return result"); + } + } + + #[tokio::test] + async fn test_failpoint_sleep() { + configure_failpoint("test_sleep", "sleep(10)").unwrap(); + let start = std::time::Instant::now(); + let result = failpoint("test_sleep", None).await; + let duration = start.elapsed(); + + matches!(result, FailpointResult::Continue); + assert!(duration >= Duration::from_millis(10)); + } + + #[tokio::test] + async fn test_failpoint_pause_with_cancellation() { + configure_failpoint("test_pause", "pause").unwrap(); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + + // Cancel after 10ms + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(10)).await; + cancel_token_clone.cancel(); + }); + + let result = failpoint_with_cancellation("test_pause", None, &cancel_token).await; + matches!(result, FailpointResult::Cancelled); + } + + #[tokio::test] + async fn test_context_matching() { + let mut context_matchers = HashMap::new(); + context_matchers.insert("tenant_id".to_string(), "test_.*".to_string()); + + configure_failpoint_with_context("test_context", "return(matched)", context_matchers).unwrap(); + + let mut context = HashMap::new(); + context.insert("tenant_id".to_string(), "test_123".to_string()); + + let result = failpoint("test_context", Some(&context)).await; + if let FailpointResult::Return(value) = result { + assert_eq!(value, "matched"); + } else { + panic!("Expected return result"); + } + + // Test non-matching context + context.insert("tenant_id".to_string(), "other_123".to_string()); + let result = failpoint("test_context", Some(&context)).await; + matches!(result, FailpointResult::Continue); + } +} \ No newline at end of file diff --git a/libs/neon_failpoint/src/macros.rs b/libs/neon_failpoint/src/macros.rs new file mode 100644 index 0000000000..969c0c225c --- /dev/null +++ b/libs/neon_failpoint/src/macros.rs @@ -0,0 +1,155 @@ +//! Macros for convenient failpoint usage + +/// Simple failpoint macro - async version that doesn't require spawn_blocking +#[macro_export] +macro_rules! fail_point { + ($name:literal) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, None).await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + // For compatibility with fail crate, we need to handle the return value + // This will be used in closures like |x| x pattern + return Ok(value.parse().unwrap_or_default()); + }, + $crate::FailpointResult::Cancelled => {}, + } + } + }}; + ($name:literal, |$var:ident| $body:expr) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, None).await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let $var = value.as_str(); + $body + }, + $crate::FailpointResult::Cancelled => {}, + } + } + }}; +} + +/// Failpoint macro with context support +#[macro_export] +macro_rules! fail_point_with_context { + ($name:literal, $context:expr) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, Some($context)).await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + return Ok(value.parse().unwrap_or_default()); + }, + $crate::FailpointResult::Cancelled => {}, + } + } + }}; + ($name:literal, $context:expr, |$var:ident| $body:expr) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint($name, Some($context)).await { + $crate::FailpointResult::Continue => {}, + $crate::FailpointResult::Return(value) => { + let $var = value.as_str(); + $body + }, + $crate::FailpointResult::Cancelled => {}, + } + } + }}; +} + +/// Pausable failpoint macro - equivalent to the old pausable_failpoint +#[macro_export] +macro_rules! pausable_failpoint { + ($name:literal) => {{ + if cfg!(feature = "testing") { + let cancel = ::tokio_util::sync::CancellationToken::new(); + let _ = $crate::pausable_failpoint!($name, &cancel); + } + }}; + ($name:literal, $cancel:expr) => {{ + if cfg!(feature = "testing") { + match $crate::failpoint_with_cancellation($name, None, $cancel).await { + $crate::FailpointResult::Continue => Ok(()), + $crate::FailpointResult::Return(_) => Ok(()), + $crate::FailpointResult::Cancelled => Err(()), + } + } else { + Ok(()) + } + }}; +} + +/// Sleep failpoint macro - for async sleep operations +#[macro_export] +macro_rules! sleep_millis_async { + ($name:literal) => {{ + if cfg!(feature = "testing") { + $crate::failpoint($name, None).await; + } + }}; + ($name:literal, $cancel:expr) => {{ + if cfg!(feature = "testing") { + $crate::failpoint_with_cancellation($name, None, $cancel).await; + } + }}; +} + +/// Convenience macro for creating failpoint context +#[macro_export] +macro_rules! failpoint_context { + ($($key:expr => $value:expr),* $(,)?) => {{ + let mut context = ::std::collections::HashMap::new(); + $( + context.insert($key.to_string(), $value.to_string()); + )* + context + }}; +} + +/// Macro for simple failpoint calls that might return early +#[macro_export] +macro_rules! failpoint_return { + ($name:literal) => {{ + if cfg!(feature = "testing") { + if let $crate::FailpointResult::Return(value) = $crate::failpoint($name, None).await { + return value.parse().unwrap_or_default(); + } + } + }}; + ($name:literal, $context:expr) => {{ + if cfg!(feature = "testing") { + if let $crate::FailpointResult::Return(value) = $crate::failpoint($name, Some($context)).await { + return value.parse().unwrap_or_default(); + } + } + }}; +} + +/// Macro for failpoint calls that might bail with an error +#[macro_export] +macro_rules! failpoint_bail { + ($name:literal, $error_msg:literal) => {{ + if cfg!(feature = "testing") { + if let $crate::FailpointResult::Return(_) = $crate::failpoint($name, None).await { + anyhow::bail!($error_msg); + } + } + }}; + ($name:literal, $context:expr, $error_msg:literal) => {{ + if cfg!(feature = "testing") { + if let $crate::FailpointResult::Return(_) = $crate::failpoint($name, Some($context)).await { + anyhow::bail!($error_msg); + } + } + }}; +} + +// Re-export for convenience +pub use fail_point; +pub use fail_point_with_context; +pub use failpoint_bail; +pub use failpoint_context; +pub use failpoint_return; +pub use pausable_failpoint; +pub use sleep_millis_async; \ No newline at end of file diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 7b1dc56071..6dbaa30fa1 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -9,7 +9,7 @@ default = ["rename_noreplace"] rename_noreplace = [] # Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro, # which adds some runtime cost to run tests on outage conditions -testing = ["fail/failpoints"] +testing = ["neon_failpoint/testing"] [dependencies] arc-swap.workspace = true @@ -24,7 +24,7 @@ diatomic-waker.workspace = true git-version.workspace = true hex = { workspace = true, features = ["serde"] } humantime.workspace = true -fail.workspace = true +neon_failpoint.workspace = true futures = { workspace = true } jsonwebtoken.workspace = true nix = { workspace = true, features = ["ioctl"] } diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index ce014eb0ac..87abb09384 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -1,9 +1,12 @@ //! Failpoint support code shared between pageserver and safekeepers. +//! +//! This module provides a compatibility layer over the new neon_failpoint crate. use tokio_util::sync::CancellationToken; +pub use neon_failpoint::{configure_failpoint as apply_failpoint, init, has_failpoints}; /// Declare a failpoint that can use to `pause` failpoint action. -/// We don't want to block the executor thread, hence, spawn_blocking + await. +/// This is now a compatibility wrapper around the new neon_failpoint crate. /// /// Optionally pass a cancellation token, and this failpoint will drop out of /// its pause when the cancellation token fires. This is useful for testing @@ -11,9 +14,6 @@ use tokio_util::sync::CancellationToken; /// The macro evaluates to a Result in that case, where Ok(()) is the case /// where the failpoint was not paused, and Err() is the case where cancellation /// token fired while evaluating the failpoint. -/// -/// Remember to unpause the failpoint in the test; until that happens, one of the -/// limited number of spawn_blocking thread pool threads is leaked. #[macro_export] macro_rules! pausable_failpoint { ($name:literal) => {{ @@ -24,26 +24,10 @@ macro_rules! pausable_failpoint { }}; ($name:literal, $cancel:expr) => {{ if cfg!(feature = "testing") { - let failpoint_fut = ::tokio::task::spawn_blocking({ - let current = ::tracing::Span::current(); - move || { - let _entered = current.entered(); - ::tracing::info!("at failpoint {}", $name); - ::fail::fail_point!($name); - } - }); - let cancel_fut = async move { - $cancel.cancelled().await; - }; - ::tokio::select! { - res = failpoint_fut => { - res.expect("spawn_blocking"); - // continue with execution - Ok(()) - }, - _ = cancel_fut => { - Err(()) - } + match ::neon_failpoint::failpoint_with_cancellation($name, None, $cancel).await { + ::neon_failpoint::FailpointResult::Continue => Ok(()), + ::neon_failpoint::FailpointResult::Return(_) => Ok(()), + ::neon_failpoint::FailpointResult::Cancelled => Err(()), } } else { Ok(()) @@ -53,7 +37,7 @@ macro_rules! pausable_failpoint { pub use pausable_failpoint; -/// use with fail::cfg("$name", "return(2000)") +/// use with neon_failpoint::configure_failpoint("$name", "sleep(2000)") /// /// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the /// specified time (in milliseconds). The main difference is that we use async @@ -66,120 +50,32 @@ pub use pausable_failpoint; #[macro_export] macro_rules! __failpoint_sleep_millis_async { ($name:literal) => {{ - // If the failpoint is used with a "return" action, set should_sleep to the - // returned value (as string). Otherwise it's set to None. - let should_sleep = (|| { - ::fail::fail_point!($name, |x| x); - ::std::option::Option::None - })(); - - // Sleep if the action was a returned value - if let ::std::option::Option::Some(duration_str) = should_sleep { - $crate::failpoint_support::failpoint_sleep_helper($name, duration_str).await + if cfg!(feature = "testing") { + ::neon_failpoint::failpoint($name, None).await; } }}; ($name:literal, $cancel:expr) => {{ - // If the failpoint is used with a "return" action, set should_sleep to the - // returned value (as string). Otherwise it's set to None. - let should_sleep = (|| { - ::fail::fail_point!($name, |x| x); - ::std::option::Option::None - })(); - - // Sleep if the action was a returned value - if let ::std::option::Option::Some(duration_str) = should_sleep { - $crate::failpoint_support::failpoint_sleep_cancellable_helper( - $name, - duration_str, - $cancel, - ) - .await + if cfg!(feature = "testing") { + ::neon_failpoint::failpoint_with_cancellation($name, None, $cancel).await; } }}; } pub use __failpoint_sleep_millis_async as sleep_millis_async; -// Helper function used by the macro. (A function has nicer scoping so we -// don't need to decorate everything with "::") +// Helper functions are no longer needed as the new implementation handles this internally +// but we keep them for backward compatibility #[doc(hidden)] -pub async fn failpoint_sleep_helper(name: &'static str, duration_str: String) { - let millis = duration_str.parse::().unwrap(); - let d = std::time::Duration::from_millis(millis); - - tracing::info!("failpoint {:?}: sleeping for {:?}", name, d); - tokio::time::sleep(d).await; - tracing::info!("failpoint {:?}: sleep done", name); +pub async fn failpoint_sleep_helper(_name: &'static str, _duration_str: String) { + // This is now handled by the neon_failpoint crate internally + tracing::warn!("failpoint_sleep_helper is deprecated, use neon_failpoint directly"); } -// Helper function used by the macro. (A function has nicer scoping so we -// don't need to decorate everything with "::") #[doc(hidden)] pub async fn failpoint_sleep_cancellable_helper( - name: &'static str, - duration_str: String, - cancel: &CancellationToken, + _name: &'static str, + _duration_str: String, + _cancel: &CancellationToken, ) { - let millis = duration_str.parse::().unwrap(); - let d = std::time::Duration::from_millis(millis); - - tracing::info!("failpoint {:?}: sleeping for {:?}", name, d); - tokio::time::timeout(d, cancel.cancelled()).await.ok(); - tracing::info!("failpoint {:?}: sleep done", name); -} - -/// Initialize the configured failpoints -/// -/// You must call this function before any concurrent threads do operations. -pub fn init() -> fail::FailScenario<'static> { - // The failpoints lib provides support for parsing the `FAILPOINTS` env var. - // We want non-default behavior for `exit`, though, so, we handle it separately. - // - // Format for FAILPOINTS is "name=actions" separated by ";". - let actions = std::env::var("FAILPOINTS"); - if actions.is_ok() { - // SAFETY: this function should before any threads start and access env vars concurrently - unsafe { - std::env::remove_var("FAILPOINTS"); - } - } else { - // let the library handle non-utf8, or nothing for not present - } - - let scenario = fail::FailScenario::setup(); - - if let Ok(val) = actions { - val.split(';') - .enumerate() - .map(|(i, s)| s.split_once('=').ok_or((i, s))) - .for_each(|res| { - let (name, actions) = match res { - Ok(t) => t, - Err((i, s)) => { - panic!( - "startup failpoints: missing action on the {}th failpoint; try `{s}=return`", - i + 1, - ); - } - }; - if let Err(e) = apply_failpoint(name, actions) { - panic!("startup failpoints: failed to apply failpoint {name}={actions}: {e}"); - } - }); - } - - scenario -} - -pub fn apply_failpoint(name: &str, actions: &str) -> Result<(), String> { - if actions == "exit" { - fail::cfg_callback(name, exit_failpoint) - } else { - fail::cfg(name, actions) - } -} - -#[inline(never)] -fn exit_failpoint() { - tracing::info!("Exit requested by failpoint"); - std::process::exit(1); + // This is now handled by the neon_failpoint crate internally + tracing::warn!("failpoint_sleep_cancellable_helper is deprecated, use neon_failpoint directly"); }