diff --git a/compute_tools/src/http/routes/failpoints.rs b/compute_tools/src/http/routes/failpoints.rs index d0661b65cf..ea3098c05b 100644 --- a/compute_tools/src/http/routes/failpoints.rs +++ b/compute_tools/src/http/routes/failpoints.rs @@ -1,9 +1,9 @@ use axum::response::{IntoResponse, Response}; use http::StatusCode; +use neon_failpoint::{configure_failpoint, configure_failpoint_with_context}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tracing::info; -use neon_failpoint::{configure_failpoint, configure_failpoint_with_context}; pub type ConfigureFailpointsRequest = Vec; @@ -39,7 +39,10 @@ pub(in crate::http) async fn configure_failpoints( } for fp in &*failpoints { - info!("cfg failpoint: {} {} (context: {:?})", fp.name, fp.actions, fp.context_matchers); + info!( + "cfg failpoint: {} {} (context: {:?})", + fp.name, fp.actions, fp.context_matchers + ); let cfg_result = if let Some(context_matchers) = fp.context_matchers.clone() { configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers) diff --git a/compute_tools/src/migration.rs b/compute_tools/src/migration.rs index 2acde44044..d44cbd33b0 100644 --- a/compute_tools/src/migration.rs +++ b/compute_tools/src/migration.rs @@ -46,7 +46,8 @@ impl<'m> MigrationRunner<'m> { }); false - }.await; + } + .await; if fail { return Err(anyhow::anyhow!(format!( diff --git a/libs/http-utils/src/failpoints.rs b/libs/http-utils/src/failpoints.rs index bae5153062..7d351f9e97 100644 --- a/libs/http-utils/src/failpoints.rs +++ b/libs/http-utils/src/failpoints.rs @@ -1,8 +1,8 @@ use hyper::{Body, Request, Response, StatusCode}; +use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tokio_util::sync::CancellationToken; -use neon_failpoint::{configure_failpoint, configure_failpoint_with_context, has_failpoints}; use crate::error::ApiError; use crate::json::{json_request, json_response}; @@ -39,7 +39,12 @@ pub async fn failpoints_handler( let failpoints: ConfigureFailpointsRequest = json_request(&mut request).await?; for fp in failpoints { - tracing::info!("cfg failpoint: {} {} (context: {:?})", fp.name, fp.actions, fp.context_matchers); + tracing::info!( + "cfg failpoint: {} {} (context: {:?})", + fp.name, + fp.actions, + fp.context_matchers + ); let cfg_result = if let Some(context_matchers) = fp.context_matchers { configure_failpoint_with_context(&fp.name, &fp.actions, context_matchers) @@ -49,7 +54,9 @@ pub async fn failpoints_handler( if let Err(err) = cfg_result { return Err(ApiError::BadRequest(anyhow::anyhow!( - "Failed to configure failpoint '{}': {}", fp.name, err + "Failed to configure failpoint '{}': {}", + fp.name, + err ))); } } diff --git a/libs/neon_failpoint/examples/context_demo.rs b/libs/neon_failpoint/examples/context_demo.rs index 38d3333560..c0fdba9f3e 100644 --- a/libs/neon_failpoint/examples/context_demo.rs +++ b/libs/neon_failpoint/examples/context_demo.rs @@ -1,4 +1,6 @@ -use neon_failpoint::{configure_failpoint_with_context, failpoint, failpoint_context, FailpointResult}; +use neon_failpoint::{ + configure_failpoint_with_context, failpoint, failpoint_context, FailpointResult, +}; use std::collections::HashMap; #[tokio::main] @@ -12,10 +14,11 @@ async fn main() { context_matchers.insert("operation".to_string(), "backup".to_string()); configure_failpoint_with_context( - "backup_operation", - "return(simulated_failure)", - context_matchers - ).unwrap(); + "backup_operation", + "return(simulated_failure)", + context_matchers, + ) + .unwrap(); // Test with matching context let context = failpoint_context! { @@ -54,4 +57,4 @@ async fn main() { println!("Failpoint cancelled"); } } -} \ No newline at end of file +} diff --git a/libs/neon_failpoint/src/lib.rs b/libs/neon_failpoint/src/lib.rs index a8a407e6ee..08e72f08c8 100644 --- a/libs/neon_failpoint/src/lib.rs +++ b/libs/neon_failpoint/src/lib.rs @@ -101,9 +101,9 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> { notifiers: Vec::new(), trigger_count: 0, }; - + let mut failpoints = FAILPOINTS.write(); - + // If this failpoint already exists, notify all waiting tasks if let Some(existing_config) = failpoints.get(name) { // Notify all waiting tasks about the configuration change @@ -111,9 +111,9 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> { notifier.notify_waiters(); } } - + failpoints.insert(name.to_string(), config); - + tracing::info!("Configured failpoint: {} = {}", name, actions); Ok(()) } @@ -131,9 +131,9 @@ pub fn configure_failpoint_with_context( notifiers: Vec::new(), trigger_count: 0, }; - + let mut failpoints = FAILPOINTS.write(); - + // If this failpoint already exists, notify all waiting tasks if let Some(existing_config) = failpoints.get(name) { // Notify all waiting tasks about the configuration change @@ -141,9 +141,9 @@ pub fn configure_failpoint_with_context( notifier.notify_waiters(); } } - + failpoints.insert(name.to_string(), config); - + tracing::info!("Configured failpoint with context: {} = {}", name, actions); Ok(()) } @@ -151,16 +151,16 @@ pub fn configure_failpoint_with_context( /// Remove a failpoint configuration pub fn remove_failpoint(name: &str) { let mut failpoints = FAILPOINTS.write(); - + // Notify all waiting tasks before removing if let Some(existing_config) = failpoints.get(name) { for notifier in &existing_config.notifiers { notifier.notify_waiters(); } } - + failpoints.remove(name); - + tracing::info!("Removed failpoint: {}", name); } @@ -247,10 +247,10 @@ fn execute_action<'a>( } FailpointAction::Sleep(millis) => { tracing::info!("Failpoint {} sleeping for {}ms", name, millis); - + // Create a notifier for this task let notifier = Arc::new(Notify::new()); - + // Add the notifier to the failpoint configuration { let mut failpoints = FAILPOINTS.write(); @@ -258,13 +258,13 @@ fn execute_action<'a>( fp_config.notifiers.push(notifier.clone()); } } - + // Create cleanup guard to remove notifier when done let _guard = NotifierCleanupGuard { failpoint_name: name.to_string(), notifier: notifier.clone(), }; - + // Sleep with cancellation support tokio::select! { _ = tokio::time::sleep(Duration::from_millis(*millis)) => { @@ -283,10 +283,10 @@ fn execute_action<'a>( } FailpointAction::Pause => { tracing::info!("Failpoint {} pausing", name); - + // Create a notifier for this task let notifier = Arc::new(Notify::new()); - + // Add the notifier to the failpoint configuration { let mut failpoints = FAILPOINTS.write(); @@ -294,13 +294,13 @@ fn execute_action<'a>( fp_config.notifiers.push(notifier.clone()); } } - + // Create cleanup guard to remove notifier when done let _guard = NotifierCleanupGuard { failpoint_name: name.to_string(), notifier: notifier.clone(), }; - + // Wait until cancelled or notified tokio::select! { _ = cancel_token.cancelled() => { @@ -400,7 +400,7 @@ fn matches_context(matchers: &HashMap, context: &FailpointContex let Some(value) = context.get(key) else { return false; }; - + // Try to compile and match as regex if let Ok(regex) = Regex::new(pattern) { if !regex.is_match(value) { @@ -427,7 +427,9 @@ impl Drop for NotifierCleanupGuard { let mut failpoints = FAILPOINTS.write(); if let Some(fp_config) = failpoints.get_mut(&self.failpoint_name) { // Remove this specific notifier from the list - fp_config.notifiers.retain(|n| !Arc::ptr_eq(n, &self.notifier)); + fp_config + .notifiers + .retain(|n| !Arc::ptr_eq(n, &self.notifier)); } } } @@ -461,7 +463,7 @@ mod tests { let start = std::time::Instant::now(); let result = failpoint("test_sleep", None).await; let duration = start.elapsed(); - + matches!(result, FailpointResult::Continue); assert!(duration >= Duration::from_millis(10)); } @@ -471,13 +473,13 @@ mod tests { configure_failpoint("test_pause", "pause").unwrap(); let cancel_token = CancellationToken::new(); let cancel_token_clone = cancel_token.clone(); - + // Cancel after 10ms tokio::spawn(async move { tokio::time::sleep(Duration::from_millis(10)).await; cancel_token_clone.cancel(); }); - + let result = failpoint_with_cancellation("test_pause", None, &cancel_token).await; matches!(result, FailpointResult::Cancelled); } @@ -486,28 +488,29 @@ mod tests { async fn test_failpoint_pause_and_resume() { let failpoint_name = "test_pause_resume"; configure_failpoint(failpoint_name, "pause").unwrap(); - + let start = std::time::Instant::now(); - + // Start a task that hits the failpoint - let task = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - + let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + // Give it time to hit the failpoint and pause tokio::time::sleep(Duration::from_millis(10)).await; - + // Reconfigure the failpoint to "off" to resume configure_failpoint(failpoint_name, "off").unwrap(); - + // The task should complete quickly now - let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap(); - + let result = timeout(Duration::from_millis(100), task) + .await + .unwrap() + .unwrap(); + let duration = start.elapsed(); - + // Should have resumed and continued matches!(result, FailpointResult::Continue); - + // Should have taken at least 10ms (initial pause) but less than 1 second assert!(duration >= Duration::from_millis(10)); assert!(duration < Duration::from_millis(1000)); @@ -517,22 +520,23 @@ mod tests { async fn test_failpoint_pause_and_change_to_return() { let failpoint_name = "test_pause_to_return"; configure_failpoint(failpoint_name, "pause").unwrap(); - + // Start a task that hits the failpoint - let task = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - + let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + // Give it time to hit the failpoint and pause tokio::time::sleep(Duration::from_millis(10)).await; - + // Reconfigure the failpoint to return a value configure_failpoint(failpoint_name, "return(resumed)").unwrap(); - + // The task should complete by resuming (Continue), not with the new return value // The new configuration only applies to future hits of the failpoint - let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap(); - + let result = timeout(Duration::from_millis(100), task) + .await + .unwrap() + .unwrap(); + matches!(result, FailpointResult::Continue); } @@ -540,28 +544,29 @@ mod tests { async fn test_failpoint_sleep_and_resume() { let failpoint_name = "test_sleep_resume"; configure_failpoint(failpoint_name, "sleep(1000)").unwrap(); // 1 second sleep - + let start = std::time::Instant::now(); - + // Start a task that hits the failpoint - let task = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - + let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + // Give it time to hit the failpoint and start sleeping tokio::time::sleep(Duration::from_millis(10)).await; - + // Reconfigure the failpoint to "off" to resume configure_failpoint(failpoint_name, "off").unwrap(); - + // The task should complete quickly now - let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap(); - + let result = timeout(Duration::from_millis(100), task) + .await + .unwrap() + .unwrap(); + let duration = start.elapsed(); - + // Should have resumed and continued matches!(result, FailpointResult::Continue); - + // Should have taken much less than the original 1 second sleep assert!(duration < Duration::from_millis(500)); } @@ -570,28 +575,29 @@ mod tests { async fn test_failpoint_sleep_and_change_duration() { let failpoint_name = "test_sleep_change"; configure_failpoint(failpoint_name, "sleep(1000)").unwrap(); // 1 second sleep - + let start = std::time::Instant::now(); - + // Start a task that hits the failpoint - let task = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - + let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + // Give it time to hit the failpoint and start sleeping tokio::time::sleep(Duration::from_millis(10)).await; - + // Change the sleep duration to a shorter time configure_failpoint(failpoint_name, "sleep(50)").unwrap(); - + // The task should complete by resuming (Continue), not by sleeping the new duration - let result = timeout(Duration::from_millis(200), task).await.unwrap().unwrap(); - + let result = timeout(Duration::from_millis(200), task) + .await + .unwrap() + .unwrap(); + let duration = start.elapsed(); - + // Should have resumed and continued matches!(result, FailpointResult::Continue); - + // Should have taken much less than the original 1 second sleep assert!(duration < Duration::from_millis(500)); } @@ -600,21 +606,22 @@ mod tests { async fn test_failpoint_remove_during_pause() { let failpoint_name = "test_remove_pause"; configure_failpoint(failpoint_name, "pause").unwrap(); - + // Start a task that hits the failpoint - let task = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - + let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + // Give it time to hit the failpoint and pause tokio::time::sleep(Duration::from_millis(10)).await; - + // Remove the failpoint entirely remove_failpoint(failpoint_name); - + // The task should complete quickly now - let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap(); - + let result = timeout(Duration::from_millis(100), task) + .await + .unwrap() + .unwrap(); + // Should have resumed and continued matches!(result, FailpointResult::Continue); } @@ -623,31 +630,34 @@ mod tests { async fn test_multiple_paused_tasks_resume() { let failpoint_name = "test_multiple_pause"; configure_failpoint(failpoint_name, "pause").unwrap(); - + // Start multiple tasks that hit the same failpoint - let task1 = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - - let task2 = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - - let task3 = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - + let task1 = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + + let task2 = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + + let task3 = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + // Give them time to hit the failpoint and pause tokio::time::sleep(Duration::from_millis(10)).await; - + // Reconfigure the failpoint to "off" to resume all tasks configure_failpoint(failpoint_name, "off").unwrap(); - + // All tasks should complete quickly now - let result1 = timeout(Duration::from_millis(100), task1).await.unwrap().unwrap(); - let result2 = timeout(Duration::from_millis(100), task2).await.unwrap().unwrap(); - let result3 = timeout(Duration::from_millis(100), task3).await.unwrap().unwrap(); - + let result1 = timeout(Duration::from_millis(100), task1) + .await + .unwrap() + .unwrap(); + let result2 = timeout(Duration::from_millis(100), task2) + .await + .unwrap() + .unwrap(); + let result3 = timeout(Duration::from_millis(100), task3) + .await + .unwrap() + .unwrap(); + // All should have resumed and continued matches!(result1, FailpointResult::Continue); matches!(result2, FailpointResult::Continue); @@ -657,15 +667,13 @@ mod tests { #[tokio::test] async fn test_no_race_condition_on_rapid_config_changes() { let failpoint_name = "test_race_condition"; - + // Start with pause configure_failpoint(failpoint_name, "pause").unwrap(); - + // Start a task that will hit the failpoint - let task = tokio::spawn(async move { - failpoint(failpoint_name, None).await - }); - + let task = tokio::spawn(async move { failpoint(failpoint_name, None).await }); + // Rapidly change the configuration multiple times for i in 0..10 { tokio::time::sleep(Duration::from_millis(1)).await; @@ -675,13 +683,16 @@ mod tests { configure_failpoint(failpoint_name, "return(rapid_change)").unwrap(); } } - + // Finally set it to return configure_failpoint(failpoint_name, "return(final)").unwrap(); - + // The task should complete by resuming (Continue), not with any return value - let result = timeout(Duration::from_millis(100), task).await.unwrap().unwrap(); - + let result = timeout(Duration::from_millis(100), task) + .await + .unwrap() + .unwrap(); + // Should resume normally since it was woken up by a configuration change matches!(result, FailpointResult::Continue); } @@ -691,25 +702,25 @@ mod tests { let mut context = HashMap::new(); context.insert("user_id".to_string(), "123".to_string()); context.insert("operation".to_string(), "read".to_string()); - + let mut matchers = HashMap::new(); matchers.insert("user_id".to_string(), "123".to_string()); matchers.insert("operation".to_string(), "read".to_string()); - + configure_failpoint_with_context("test_context", "return(matched)", matchers).unwrap(); - + let result = failpoint("test_context", Some(&context)).await; if let FailpointResult::Return(Some(value)) = result { assert_eq!(value, "matched"); } else { panic!("Expected return result"); } - + // Test with non-matching context let mut wrong_context = HashMap::new(); wrong_context.insert("user_id".to_string(), "456".to_string()); wrong_context.insert("operation".to_string(), "write".to_string()); - + let result = failpoint("test_context", Some(&wrong_context)).await; matches!(result, FailpointResult::Continue); } @@ -717,16 +728,16 @@ mod tests { #[tokio::test] async fn test_failpoint_panic() { configure_failpoint("test_panic", "panic(test message)").unwrap(); - + let result = tokio::task::spawn_blocking(|| { std::panic::catch_unwind(|| { let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async { - failpoint("test_panic", None).await - }) + rt.block_on(async { failpoint("test_panic", None).await }) }) - }).await.unwrap(); - + }) + .await + .unwrap(); + assert!(result.is_err()); let panic_msg = result.unwrap_err(); if let Some(msg) = panic_msg.downcast_ref::() { @@ -741,7 +752,7 @@ mod tests { #[tokio::test] async fn test_failpoint_probability_simple() { configure_failpoint("test_prob_simple", "100%return(always)").unwrap(); - + // 100% probability should always trigger let result = failpoint("test_prob_simple", None).await; if let FailpointResult::Return(Some(value)) = result { @@ -749,7 +760,7 @@ mod tests { } else { panic!("Expected return result with 100% probability"); } - + // 0% probability should never trigger configure_failpoint("test_prob_never", "0%return(never)").unwrap(); let result = failpoint("test_prob_never", None).await; @@ -759,7 +770,7 @@ mod tests { #[tokio::test] async fn test_failpoint_probability_with_count() { configure_failpoint("test_prob_count", "100%2*return(limited)").unwrap(); - + // First two calls should trigger (100% probability, max 2 times) let result1 = failpoint("test_prob_count", None).await; if let FailpointResult::Return(Some(value)) = result1 { @@ -767,14 +778,14 @@ mod tests { } else { panic!("Expected return result on first call"); } - + let result2 = failpoint("test_prob_count", None).await; if let FailpointResult::Return(Some(value)) = result2 { assert_eq!(value, "limited"); } else { panic!("Expected return result on second call"); } - + // Third call should not trigger (count limit exceeded) let result3 = failpoint("test_prob_count", None).await; matches!(result3, FailpointResult::Continue); @@ -783,11 +794,11 @@ mod tests { #[tokio::test] async fn test_failpoint_probability_nested_actions() { configure_failpoint("test_prob_sleep", "100%sleep(10)").unwrap(); - + let start = std::time::Instant::now(); let result = failpoint("test_prob_sleep", None).await; let duration = start.elapsed(); - + matches!(result, FailpointResult::Continue); assert!(duration >= Duration::from_millis(10)); } @@ -807,7 +818,7 @@ mod tests { } else { panic!("Expected return action"); } - + // Test probability with count let action_spec = parse_action_spec("25%3*return(limited)").unwrap(); if let FailpointAction::Return(Some(ref v)) = action_spec.action { @@ -815,7 +826,7 @@ mod tests { } else { panic!("Expected return action"); } - + // Test probability with other actions let action_spec = parse_action_spec("75%sleep(100)").unwrap(); if let FailpointAction::Sleep(100) = action_spec.action { @@ -830,7 +841,7 @@ mod tests { // Test probability > 100 let result = parse_action_spec("150%return(invalid)"); assert!(result.is_err()); - + // Test invalid format let result = parse_action_spec("50%invalid_action"); assert!(result.is_err()); @@ -839,11 +850,11 @@ mod tests { #[tokio::test] async fn test_audit_specific_patterns() { // Test patterns found in the audit - + // 1. panic(failpoint) - from test_sharding.py let action_spec = parse_action_spec("panic(failpoint)").unwrap(); matches!(action_spec.action, FailpointAction::Panic(msg) if msg == "failpoint"); - + // 2. 50%return(15) - from test_bad_connection.py let action_spec = parse_action_spec("50%return(15)").unwrap(); if let FailpointAction::Return(Some(ref v)) = action_spec.action { @@ -851,7 +862,7 @@ mod tests { } else { panic!("Expected return action"); } - + // 3. 10%3*return(3000) - from test_bad_connection.py let action_spec = parse_action_spec("10%3*return(3000)").unwrap(); if let FailpointAction::Return(Some(ref v)) = action_spec.action { @@ -859,7 +870,7 @@ mod tests { } else { panic!("Expected return action"); } - + // 4. 10%return(60000) - from test_bad_connection.py let action_spec = parse_action_spec("10%return(60000)").unwrap(); if let FailpointAction::Return(Some(ref v)) = action_spec.action { @@ -867,12 +878,12 @@ mod tests { } else { panic!("Expected return action"); } - + // Test that these patterns work in practice configure_failpoint("test_audit_panic", "panic(storage controller crash)").unwrap(); configure_failpoint("test_audit_prob", "100%return(42)").unwrap(); configure_failpoint("test_audit_count", "100%1*return(limited)").unwrap(); - + // Test the probability-based return let result = failpoint("test_audit_prob", None).await; if let FailpointResult::Return(Some(value)) = result { @@ -880,7 +891,7 @@ mod tests { } else { panic!("Expected return result"); } - + // Test the count-limited probability let result1 = failpoint("test_audit_count", None).await; if let FailpointResult::Return(Some(value)) = result1 { @@ -888,9 +899,9 @@ mod tests { } else { panic!("Expected return result on first call"); } - + // Second call should not trigger (count limit exceeded) let result2 = failpoint("test_audit_count", None).await; matches!(result2, FailpointResult::Continue); } -} \ No newline at end of file +} diff --git a/libs/neon_failpoint/src/macros.rs b/libs/neon_failpoint/src/macros.rs index 25ac819507..4db7445147 100644 --- a/libs/neon_failpoint/src/macros.rs +++ b/libs/neon_failpoint/src/macros.rs @@ -151,7 +151,9 @@ macro_rules! failpoint_return { }}; ($name:literal, $context:expr) => {{ if cfg!(feature = "testing") { - if let $crate::FailpointResult::Return(value) = $crate::failpoint($name, Some($context)).await { + if let $crate::FailpointResult::Return(value) = + $crate::failpoint($name, Some($context)).await + { return value.parse().unwrap_or_default(); } } @@ -170,7 +172,9 @@ macro_rules! failpoint_bail { }}; ($name:literal, $context:expr, $error_msg:literal) => {{ if cfg!(feature = "testing") { - if let $crate::FailpointResult::Return(_) = $crate::failpoint($name, Some($context)).await { + if let $crate::FailpointResult::Return(_) = + $crate::failpoint($name, Some($context)).await + { anyhow::bail!($error_msg); } } @@ -184,4 +188,4 @@ pub use failpoint_bail; pub use failpoint_context; pub use failpoint_return; pub use pausable_failpoint; -pub use sleep_millis_async; \ No newline at end of file +pub use sleep_millis_async; diff --git a/libs/utils/src/failpoint_support.rs b/libs/utils/src/failpoint_support.rs index 87abb09384..b0dc84cb8e 100644 --- a/libs/utils/src/failpoint_support.rs +++ b/libs/utils/src/failpoint_support.rs @@ -1,9 +1,9 @@ //! Failpoint support code shared between pageserver and safekeepers. -//! +//! //! This module provides a compatibility layer over the new neon_failpoint crate. +pub use neon_failpoint::{configure_failpoint as apply_failpoint, has_failpoints, init}; use tokio_util::sync::CancellationToken; -pub use neon_failpoint::{configure_failpoint as apply_failpoint, init, has_failpoints}; /// Declare a failpoint that can use to `pause` failpoint action. /// This is now a compatibility wrapper around the new neon_failpoint crate. diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 36dada1e89..4e5974bad5 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -17,6 +17,7 @@ use anyhow::{Context, anyhow}; use async_compression::tokio::write::GzipEncoder; use bytes::{BufMut, Bytes, BytesMut}; use fail::fail_point; +use neon_failpoint as fail; use pageserver_api::key::{Key, rel_block_to_key}; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::pg_constants::{PG_HBA, PGDATA_SPECIAL_FILES}; diff --git a/pageserver/src/consumption_metrics/disk_cache.rs b/pageserver/src/consumption_metrics/disk_cache.rs index f1dad8793d..49e3a02459 100644 --- a/pageserver/src/consumption_metrics/disk_cache.rs +++ b/pageserver/src/consumption_metrics/disk_cache.rs @@ -6,6 +6,8 @@ use camino::{Utf8Path, Utf8PathBuf}; use super::{NewMetricsRoot, NewRawMetric, RawMetric}; use crate::consumption_metrics::NewMetricsRefRoot; +use neon_failpoint as fail; + pub(super) fn read_metrics_from_serde_value( json_value: serde_json::Value, ) -> anyhow::Result> { diff --git a/pageserver/src/deletion_queue/deleter.rs b/pageserver/src/deletion_queue/deleter.rs index 691ba75cc7..79ecc07893 100644 --- a/pageserver/src/deletion_queue/deleter.rs +++ b/pageserver/src/deletion_queue/deleter.rs @@ -8,6 +8,7 @@ use std::time::Duration; +use neon_failpoint as fail; use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use tokio_util::sync::CancellationToken; use tracing::{info, warn}; diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 7030ac368d..25e024dc30 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -28,6 +28,7 @@ use http_utils::{RequestExt, RouterBuilder}; use humantime::format_rfc3339; use hyper::{Body, Request, Response, StatusCode, Uri, header}; use metrics::launch_timestamp::LaunchTimestamp; +use neon_failpoint as fail; use pageserver_api::models::virtual_file::IoMode; use pageserver_api::models::{ DetachBehavior, DownloadRemoteLayersTaskSpawnRequest, IngestAuxFilesRequest, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 1fc7e4eac7..c1443a67b0 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -19,6 +19,7 @@ use futures::future::BoxFuture; use futures::{FutureExt, Stream}; use itertools::Itertools; use jsonwebtoken::TokenData; +use neon_failpoint as fail; use once_cell::sync::OnceCell; use pageserver_api::config::{ GetVectoredConcurrentIo, PageServicePipeliningConfig, PageServicePipeliningConfigPipelined, diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 8532a6938f..e70da0401c 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -14,6 +14,7 @@ use crate::{PERF_TRACE_TARGET, ensure_walingest}; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; use enum_map::Enum; +use neon_failpoint as fail; use pageserver_api::key::{ AUX_FILES_KEY, CHECKPOINT_KEY, CONTROLFILE_KEY, CompactKey, DBDIR_KEY, Key, RelDirExists, TWOPHASEDIR_KEY, dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index f67269851a..b64648ebad 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -30,6 +30,7 @@ use enumset::EnumSet; use futures::StreamExt; use futures::stream::FuturesUnordered; use itertools::Itertools as _; +use neon_failpoint as fail; use once_cell::sync::Lazy; pub use pageserver_api::models::TenantState; use pageserver_api::models::{self, RelSizeMigration}; diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 15853d3614..bb8361732d 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -12,6 +12,7 @@ use anyhow::Context; use camino::{Utf8DirEntry, Utf8Path, Utf8PathBuf}; use futures::StreamExt; use itertools::Itertools; +use neon_failpoint as fail; use pageserver_api::key::Key; use pageserver_api::models::{DetachBehavior, LocationConfigMode}; use pageserver_api::shard::{ diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index fd65000379..31ea6f9eb9 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -194,6 +194,7 @@ pub(crate) use download::{ }; use index::GcCompactionState; pub(crate) use index::LayerFileMetadata; +use neon_failpoint as fail; use pageserver_api::models::{RelSizeMigration, TimelineArchivalState, TimelineVisibilityState}; use pageserver_api::shard::{ShardIndex, TenantShardId}; use regex::Regex; diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 84989e0fb8..69b6c0816e 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -11,6 +11,7 @@ use std::time::SystemTime; use anyhow::{Context, anyhow}; use camino::{Utf8Path, Utf8PathBuf}; +use neon_failpoint as fail; use pageserver_api::shard::TenantShardId; use remote_storage::{ DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, ListingMode, RemotePath, diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index f2fbf656a6..e1499bf218 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -8,6 +8,7 @@ use anyhow::{Context, bail}; use bytes::Bytes; use camino::Utf8Path; use fail::fail_point; +use neon_failpoint as fail; use pageserver_api::shard::TenantShardId; use remote_storage::{GenericRemoteStorage, RemotePath, TimeTravelError}; use tokio::fs::{self, File}; diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index a9bc0a060b..8db15afabf 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -39,6 +39,7 @@ use layer_manager::{ LayerManagerLockHolder, LayerManagerReadGuard, LayerManagerWriteGuard, LockedLayerManager, Shutdown, }; +use neon_failpoint as fail; use once_cell::sync::Lazy; use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL; diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 171f9d1284..e60ec99291 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -26,6 +26,7 @@ use enumset::EnumSet; use fail::fail_point; use futures::FutureExt; use itertools::Itertools; +use neon_failpoint as fail; use once_cell::sync::Lazy; use pageserver_api::config::tenant_conf_defaults::DEFAULT_CHECKPOINT_DISTANCE; use pageserver_api::key::{KEY_SIZE, Key}; diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index f7dc44be90..431507afeb 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -2,6 +2,7 @@ use std::ops::{Deref, DerefMut}; use std::sync::Arc; use anyhow::Context; +use neon_failpoint as fail; use pageserver_api::models::TimelineState; use pageserver_api::shard::TenantShardId; use remote_storage::DownloadError; diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 223e888e27..f89671e86f 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use anyhow::Context; use bytes::Bytes; use http_utils::error::ApiError; +use neon_failpoint as fail; use pageserver_api::key::Key; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::DetachBehavior; diff --git a/pageserver/src/tenant/timeline/uninit.rs b/pageserver/src/tenant/timeline/uninit.rs index beebf35462..45536f6866 100644 --- a/pageserver/src/tenant/timeline/uninit.rs +++ b/pageserver/src/tenant/timeline/uninit.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use anyhow::Context; use camino::Utf8PathBuf; +use neon_failpoint as fail; use tracing::{error, info, info_span}; use utils::fs_ext; use utils::id::TimelineId; diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index f619c69599..e9f7c2a483 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -11,6 +11,7 @@ use bytes::BytesMut; use chrono::{NaiveDateTime, Utc}; use fail::fail_point; use futures::StreamExt; +use neon_failpoint as fail; use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::WAL_SEGMENT_SIZE; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index f3fa8a5e47..c098b76e55 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -879,7 +879,8 @@ impl WalSender<'_, IO> { |_| { false } ); self.end_pos > self.start_pos - }.await; + } + .await; if have_something_to_send { trace!("got end_pos {:?}, streaming", self.end_pos); @@ -938,7 +939,8 @@ impl WalSender<'_, IO> { |_| { true } ); false - }.await; + } + .await; if fp { tokio::time::sleep(POLL_STATE_TIMEOUT).await; return Ok(None);