diff --git a/libs/neon_failpoint/src/lib.rs b/libs/neon_failpoint/src/lib.rs index ee793129cc..4d845177a5 100644 --- a/libs/neon_failpoint/src/lib.rs +++ b/libs/neon_failpoint/src/lib.rs @@ -26,7 +26,7 @@ pub use either; // re-export for use in macros /// Global failpoint registry // TODO: switch to simple_rcu, but it's in `utils` -static FAILPOINTS: Lazy>>> = +static FAILPOINTS: Lazy>>>>> = Lazy::new(|| Default::default()); /// Configuration for a single failpoint @@ -191,10 +191,10 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> { // If this failpoint already exists, notify all waiting tasks if let Some(existing_config) = failpoints.get(name) { // Notify all waiting tasks about the configuration change - existing_config.notifiers.notify_all(); + existing_config.lock().unwrap().notifiers.notify_all(); } - failpoints.insert(name.to_string(), config); + failpoints.insert(name.to_string(), Arc::new(std::sync::Mutex::new(config))); tracing::info!("Configured failpoint: {} = {}", name, actions); Ok(()) @@ -219,10 +219,10 @@ pub fn configure_failpoint_with_context( // If this failpoint already exists, notify all waiting tasks if let Some(existing_config) = failpoints.get(name) { // Notify all waiting tasks about the configuration change - existing_config.notifiers.notify_all(); + existing_config.lock().unwrap().notifiers.notify_all(); } - failpoints.insert(name.to_string(), config); + failpoints.insert(name.to_string(), Arc::new(std::sync::Mutex::new(config))); tracing::info!("Configured failpoint with context: {} = {}", name, actions); Ok(()) @@ -234,7 +234,7 @@ pub fn remove_failpoint(name: &str) { // Notify all waiting tasks before removing if let Some(existing_config) = failpoints.get(name) { - existing_config.notifiers.notify_all(); + existing_config.lock().unwrap().notifiers.notify_all(); } failpoints.remove(name); @@ -252,7 +252,7 @@ pub fn list() -> Vec<(impl std::fmt::Display, impl std::fmt::Display)> { .read() .unwrap() .iter() - .map(|(name, config)| (name.clone(), format!("{config:?}"))) + .map(|(name, config)| (name.clone(), format!("{:?}", config.lock().unwrap()))) .collect::>() } @@ -275,12 +275,19 @@ pub fn failpoint_with_cancellation( return Either::Left(FailpointResult::Continue); } - // Check if the failpoint exists and get the necessary info - let (action_spec, context_matchers) = { + // Get a clone of the failpoint config Arc - this minimizes the time we hold the global lock + let config_arc = { let failpoints = FAILPOINTS.read().unwrap(); let Some(config) = failpoints.get(name) else { return Either::Left(FailpointResult::Continue); }; + config.clone() + }; + // Global lock is dropped here + + // Now work with the individual config's lock + let (action_spec, context_matchers) = { + let config = config_arc.lock().unwrap(); (config.action_spec.clone(), config.context_matchers.clone()) }; @@ -297,11 +304,8 @@ pub fn failpoint_with_cancellation( if let Some(max_count) = action_spec.max_count { // Get the current trigger count let trigger_count = { - let failpoints = FAILPOINTS.read().unwrap(); - failpoints - .get(name) - .map(|config| config.trigger_count) - .unwrap_or(0) + let config = config_arc.lock().unwrap(); + config.trigger_count }; if trigger_count >= max_count { @@ -318,34 +322,25 @@ pub fn failpoint_with_cancellation( // Increment trigger count { - let mut failpoints = FAILPOINTS.write().unwrap(); - if let Some(fp_config) = failpoints.get_mut(name) { - fp_config.trigger_count += 1; - } + let mut config = config_arc.lock().unwrap(); + config.trigger_count += 1; } } tracing::info!("Hit failpoint: {}", name); - execute_action(name, &action_spec, context, cancel_token) + execute_action(name, &action_spec, context, cancel_token, config_arc) } /// Create a notifier for a failpoint -fn create_failpoint_notifier(name: &str) -> FailpointNotifier { - let mut failpoints = FAILPOINTS.write().unwrap(); - if let Some(fp_config) = failpoints.get_mut(name) { - let cleanup_name = name.to_string(); - fp_config.notifiers.create_notifier(move |notifier| { - let mut failpoints = FAILPOINTS.write().unwrap(); - if let Some(fp_config) = failpoints.get_mut(&cleanup_name) { - fp_config.notifiers.remove_notifier(notifier); - } - }) - } else { - // Failpoint doesn't exist, create a dummy notifier - let notifier = Arc::new(Notify::new()); - FailpointNotifier::new(notifier, |_| {}) - } +fn create_failpoint_notifier(config_arc: Arc>) -> FailpointNotifier { + let cleanup_config_arc = config_arc.clone(); + + let mut config = config_arc.lock().unwrap(); + config.notifiers.create_notifier(move |notifier| { + let mut config = cleanup_config_arc.lock().unwrap(); + config.notifiers.remove_notifier(notifier); + }) } /// Execute a specific action (used for recursive execution in probability-based actions) @@ -354,6 +349,7 @@ fn execute_action( action_spec: &FailpointActionSpec, _context: Option<&FailpointContext>, cancel_token: &CancellationToken, + config_arc: Arc>, ) -> Either + Send>>> { match &action_spec.action { FailpointAction::Off => Either::Left(FailpointResult::Continue), @@ -378,7 +374,7 @@ fn execute_action( tracing::info!("Failpoint {} sleeping for {}ms", name, millis); // Create a notifier for this task - let notifier = create_failpoint_notifier(&name); + let notifier = create_failpoint_notifier(config_arc); // Sleep with cancellation support tokio::select! { @@ -405,7 +401,7 @@ fn execute_action( tracing::info!("Failpoint {} pausing", name); // Create a notifier for this task - let notifier = create_failpoint_notifier(&name); + let notifier = create_failpoint_notifier(config_arc); // Wait until cancelled or notified tokio::select! {