From 904a63dff52ec06ff15a9e0b71511127c028f824 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 11 Jul 2025 18:12:21 +0000 Subject: [PATCH] push down Arc> --- libs/neon_failpoint/src/lib.rs | 70 ++++++++++++++++------------------ 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/libs/neon_failpoint/src/lib.rs b/libs/neon_failpoint/src/lib.rs index 070960a985..9597093442 100644 --- a/libs/neon_failpoint/src/lib.rs +++ b/libs/neon_failpoint/src/lib.rs @@ -10,6 +10,7 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use anyhow::Result; @@ -26,7 +27,7 @@ pub use either; // re-export for use in macros /// Global failpoint registry // TODO: switch to simple_rcu, but it's in `utils` -static FAILPOINTS: Lazy>>>>> = +static FAILPOINTS: Lazy>>>> = Lazy::new(|| Default::default()); /// Configuration for a single failpoint @@ -37,9 +38,9 @@ pub struct FailpointConfig { /// Optional context matching rules pub context_matchers: Option>, /// Notify objects for tasks waiting on this failpoint - pub notifiers: FailpointNotifiers, + pub notifiers: Arc>, /// Counter for probability-based actions - pub trigger_count: u32, + pub trigger_count: AtomicUsize, } /// Specification for a failpoint action with probability @@ -105,11 +106,11 @@ impl FailpointNotifiers { /// /// Returns a `FailpointNotifier` that automatically removes itself /// from the collection when dropped. - pub fn create_notifier(&mut self, config_arc: Arc>) -> FailpointNotifier { + pub fn create_notifier(&mut self, notifiers_arc: Arc>) -> FailpointNotifier { let notifier = Arc::new(Notify::new()); self.notifiers.push(notifier.clone()); - FailpointNotifier::new(notifier, config_arc) + FailpointNotifier::new(notifier, notifiers_arc) } /// Notify all waiting tasks @@ -132,15 +133,15 @@ impl FailpointNotifiers { /// - Automatically cleans up when dropped pub struct FailpointNotifier { notifier: Arc, - config_arc: Arc>, + notifiers_arc: Arc>, } impl FailpointNotifier { /// Create a new notifier - pub fn new(notifier: Arc, config_arc: Arc>) -> Self { + pub fn new(notifier: Arc, notifiers_arc: Arc>) -> Self { Self { notifier, - config_arc, + notifiers_arc, } } @@ -152,8 +153,8 @@ impl FailpointNotifier { impl Drop for FailpointNotifier { fn drop(&mut self) { - let mut config = self.config_arc.lock().unwrap(); - config.notifiers.remove_notifier(&self.notifier); + let mut notifiers = self.notifiers_arc.lock().unwrap(); + notifiers.remove_notifier(&self.notifier); } } @@ -175,8 +176,8 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> { let config = FailpointConfig { action_spec, context_matchers: None, - notifiers: FailpointNotifiers::new(), - trigger_count: 0, + notifiers: Arc::new(std::sync::Mutex::new(FailpointNotifiers::new())), + trigger_count: AtomicUsize::new(0), }; let mut failpoints = FAILPOINTS.write().unwrap(); @@ -184,10 +185,10 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> { // If this failpoint already exists, notify all waiting tasks if let Some(existing_config) = failpoints.get(name) { // Notify all waiting tasks about the configuration change - existing_config.lock().unwrap().notifiers.notify_all(); + existing_config.notifiers.lock().unwrap().notify_all(); } - failpoints.insert(name.to_string(), Arc::new(std::sync::Mutex::new(config))); + failpoints.insert(name.to_string(), Arc::new(config)); tracing::info!("Configured failpoint: {} = {}", name, actions); Ok(()) @@ -203,8 +204,8 @@ pub fn configure_failpoint_with_context( let config = FailpointConfig { action_spec, context_matchers: Some(context_matchers), - notifiers: FailpointNotifiers::new(), - trigger_count: 0, + notifiers: Arc::new(std::sync::Mutex::new(FailpointNotifiers::new())), + trigger_count: AtomicUsize::new(0), }; let mut failpoints = FAILPOINTS.write().unwrap(); @@ -212,10 +213,10 @@ pub fn configure_failpoint_with_context( // If this failpoint already exists, notify all waiting tasks if let Some(existing_config) = failpoints.get(name) { // Notify all waiting tasks about the configuration change - existing_config.lock().unwrap().notifiers.notify_all(); + existing_config.notifiers.lock().unwrap().notify_all(); } - failpoints.insert(name.to_string(), Arc::new(std::sync::Mutex::new(config))); + failpoints.insert(name.to_string(), Arc::new(config)); tracing::info!("Configured failpoint with context: {} = {}", name, actions); Ok(()) @@ -227,7 +228,7 @@ pub fn remove_failpoint(name: &str) { // Notify all waiting tasks before removing if let Some(existing_config) = failpoints.get(name) { - existing_config.lock().unwrap().notifiers.notify_all(); + existing_config.notifiers.lock().unwrap().notify_all(); } failpoints.remove(name); @@ -245,7 +246,7 @@ pub fn list() -> Vec<(impl std::fmt::Display, impl std::fmt::Display)> { .read() .unwrap() .iter() - .map(|(name, config)| (name.clone(), format!("{:?}", config.lock().unwrap()))) + .map(|(name, config)| (name.clone(), format!("{:?}", config))) .collect::>() } @@ -269,7 +270,7 @@ pub fn failpoint_with_cancellation( } // Get a clone of the failpoint config Arc - this minimizes the time we hold the global lock - let config_arc = { + let config = { let failpoints = FAILPOINTS.read().unwrap(); let Some(config) = failpoints.get(name) else { return Either::Left(FailpointResult::Continue); @@ -280,7 +281,6 @@ pub fn failpoint_with_cancellation( // Now work with the individual config's lock let (action_spec, context_matchers) = { - let config = config_arc.lock().unwrap(); (config.action_spec.clone(), config.context_matchers.clone()) }; @@ -296,12 +296,9 @@ pub fn failpoint_with_cancellation( // Check if we've hit the max count if let Some(max_count) = action_spec.max_count { // Get the current trigger count - let trigger_count = { - let config = config_arc.lock().unwrap(); - config.trigger_count - }; + let trigger_count = config.trigger_count.load(Ordering::Relaxed); - if trigger_count >= max_count { + if trigger_count >= max_count as usize { return Either::Left(FailpointResult::Continue); } } @@ -314,21 +311,18 @@ pub fn failpoint_with_cancellation( } // Increment trigger count - { - let mut config = config_arc.lock().unwrap(); - config.trigger_count += 1; - } + config.trigger_count.fetch_add(1, Ordering::Relaxed); } tracing::info!("Hit failpoint: {}", name); - execute_action(name, &action_spec, context, cancel_token, config_arc) + execute_action(name, &action_spec, context, cancel_token, config) } /// Create a notifier for a failpoint -fn create_failpoint_notifier(config_arc: Arc>) -> FailpointNotifier { - let mut config = config_arc.lock().unwrap(); - config.notifiers.create_notifier(config_arc.clone()) +fn create_failpoint_notifier(config: Arc) -> FailpointNotifier { + let mut notifiers = config.notifiers.lock().unwrap(); + notifiers.create_notifier(config.notifiers.clone()) } /// Execute a specific action (used for recursive execution in probability-based actions) @@ -337,7 +331,7 @@ fn execute_action( action_spec: &FailpointActionSpec, _context: Option<&FailpointContext>, cancel_token: &CancellationToken, - config_arc: Arc>, + config: Arc, ) -> Either + Send>>> { match &action_spec.action { FailpointAction::Off => Either::Left(FailpointResult::Continue), @@ -362,7 +356,7 @@ fn execute_action( tracing::info!("Failpoint {} sleeping for {}ms", name, millis); // Create a notifier for this task - let notifier = create_failpoint_notifier(config_arc); + let notifier = create_failpoint_notifier(config); // Sleep with cancellation support tokio::select! { @@ -389,7 +383,7 @@ fn execute_action( tracing::info!("Failpoint {} pausing", name); // Create a notifier for this task - let notifier = create_failpoint_notifier(config_arc); + let notifier = create_failpoint_notifier(config); // Wait until cancelled or notified tokio::select! {