push down Arc<Mutex>>

This commit is contained in:
Christian Schwarz
2025-07-11 18:12:21 +00:00
parent 67dbe63275
commit 904a63dff5

View File

@@ -10,6 +10,7 @@ use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use anyhow::Result;
@@ -26,7 +27,7 @@ pub use either; // re-export for use in macros
/// Global failpoint registry
// TODO: switch to simple_rcu, but it's in `utils`
static FAILPOINTS: Lazy<Arc<std::sync::RwLock<HashMap<String, Arc<std::sync::Mutex<FailpointConfig>>>>>> =
static FAILPOINTS: Lazy<Arc<std::sync::RwLock<HashMap<String, Arc<FailpointConfig>>>>> =
Lazy::new(|| Default::default());
/// Configuration for a single failpoint
@@ -37,9 +38,9 @@ pub struct FailpointConfig {
/// Optional context matching rules
pub context_matchers: Option<HashMap<String, String>>,
/// Notify objects for tasks waiting on this failpoint
pub notifiers: FailpointNotifiers,
pub notifiers: Arc<std::sync::Mutex<FailpointNotifiers>>,
/// Counter for probability-based actions
pub trigger_count: u32,
pub trigger_count: AtomicUsize,
}
/// Specification for a failpoint action with probability
@@ -105,11 +106,11 @@ impl FailpointNotifiers {
///
/// Returns a `FailpointNotifier` that automatically removes itself
/// from the collection when dropped.
pub fn create_notifier(&mut self, config_arc: Arc<std::sync::Mutex<FailpointConfig>>) -> FailpointNotifier {
pub fn create_notifier(&mut self, notifiers_arc: Arc<std::sync::Mutex<FailpointNotifiers>>) -> FailpointNotifier {
let notifier = Arc::new(Notify::new());
self.notifiers.push(notifier.clone());
FailpointNotifier::new(notifier, config_arc)
FailpointNotifier::new(notifier, notifiers_arc)
}
/// Notify all waiting tasks
@@ -132,15 +133,15 @@ impl FailpointNotifiers {
/// - Automatically cleans up when dropped
pub struct FailpointNotifier {
notifier: Arc<Notify>,
config_arc: Arc<std::sync::Mutex<FailpointConfig>>,
notifiers_arc: Arc<std::sync::Mutex<FailpointNotifiers>>,
}
impl FailpointNotifier {
/// Create a new notifier
pub fn new(notifier: Arc<Notify>, config_arc: Arc<std::sync::Mutex<FailpointConfig>>) -> Self {
pub fn new(notifier: Arc<Notify>, notifiers_arc: Arc<std::sync::Mutex<FailpointNotifiers>>) -> Self {
Self {
notifier,
config_arc,
notifiers_arc,
}
}
@@ -152,8 +153,8 @@ impl FailpointNotifier {
impl Drop for FailpointNotifier {
fn drop(&mut self) {
let mut config = self.config_arc.lock().unwrap();
config.notifiers.remove_notifier(&self.notifier);
let mut notifiers = self.notifiers_arc.lock().unwrap();
notifiers.remove_notifier(&self.notifier);
}
}
@@ -175,8 +176,8 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> {
let config = FailpointConfig {
action_spec,
context_matchers: None,
notifiers: FailpointNotifiers::new(),
trigger_count: 0,
notifiers: Arc::new(std::sync::Mutex::new(FailpointNotifiers::new())),
trigger_count: AtomicUsize::new(0),
};
let mut failpoints = FAILPOINTS.write().unwrap();
@@ -184,10 +185,10 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> {
// If this failpoint already exists, notify all waiting tasks
if let Some(existing_config) = failpoints.get(name) {
// Notify all waiting tasks about the configuration change
existing_config.lock().unwrap().notifiers.notify_all();
existing_config.notifiers.lock().unwrap().notify_all();
}
failpoints.insert(name.to_string(), Arc::new(std::sync::Mutex::new(config)));
failpoints.insert(name.to_string(), Arc::new(config));
tracing::info!("Configured failpoint: {} = {}", name, actions);
Ok(())
@@ -203,8 +204,8 @@ pub fn configure_failpoint_with_context(
let config = FailpointConfig {
action_spec,
context_matchers: Some(context_matchers),
notifiers: FailpointNotifiers::new(),
trigger_count: 0,
notifiers: Arc::new(std::sync::Mutex::new(FailpointNotifiers::new())),
trigger_count: AtomicUsize::new(0),
};
let mut failpoints = FAILPOINTS.write().unwrap();
@@ -212,10 +213,10 @@ pub fn configure_failpoint_with_context(
// If this failpoint already exists, notify all waiting tasks
if let Some(existing_config) = failpoints.get(name) {
// Notify all waiting tasks about the configuration change
existing_config.lock().unwrap().notifiers.notify_all();
existing_config.notifiers.lock().unwrap().notify_all();
}
failpoints.insert(name.to_string(), Arc::new(std::sync::Mutex::new(config)));
failpoints.insert(name.to_string(), Arc::new(config));
tracing::info!("Configured failpoint with context: {} = {}", name, actions);
Ok(())
@@ -227,7 +228,7 @@ pub fn remove_failpoint(name: &str) {
// Notify all waiting tasks before removing
if let Some(existing_config) = failpoints.get(name) {
existing_config.lock().unwrap().notifiers.notify_all();
existing_config.notifiers.lock().unwrap().notify_all();
}
failpoints.remove(name);
@@ -245,7 +246,7 @@ pub fn list() -> Vec<(impl std::fmt::Display, impl std::fmt::Display)> {
.read()
.unwrap()
.iter()
.map(|(name, config)| (name.clone(), format!("{:?}", config.lock().unwrap())))
.map(|(name, config)| (name.clone(), format!("{:?}", config)))
.collect::<Vec<_>>()
}
@@ -269,7 +270,7 @@ pub fn failpoint_with_cancellation(
}
// Get a clone of the failpoint config Arc - this minimizes the time we hold the global lock
let config_arc = {
let config = {
let failpoints = FAILPOINTS.read().unwrap();
let Some(config) = failpoints.get(name) else {
return Either::Left(FailpointResult::Continue);
@@ -280,7 +281,6 @@ pub fn failpoint_with_cancellation(
// Now work with the individual config's lock
let (action_spec, context_matchers) = {
let config = config_arc.lock().unwrap();
(config.action_spec.clone(), config.context_matchers.clone())
};
@@ -296,12 +296,9 @@ pub fn failpoint_with_cancellation(
// Check if we've hit the max count
if let Some(max_count) = action_spec.max_count {
// Get the current trigger count
let trigger_count = {
let config = config_arc.lock().unwrap();
config.trigger_count
};
let trigger_count = config.trigger_count.load(Ordering::Relaxed);
if trigger_count >= max_count {
if trigger_count >= max_count as usize {
return Either::Left(FailpointResult::Continue);
}
}
@@ -314,21 +311,18 @@ pub fn failpoint_with_cancellation(
}
// Increment trigger count
{
let mut config = config_arc.lock().unwrap();
config.trigger_count += 1;
}
config.trigger_count.fetch_add(1, Ordering::Relaxed);
}
tracing::info!("Hit failpoint: {}", name);
execute_action(name, &action_spec, context, cancel_token, config_arc)
execute_action(name, &action_spec, context, cancel_token, config)
}
/// Create a notifier for a failpoint
fn create_failpoint_notifier(config_arc: Arc<std::sync::Mutex<FailpointConfig>>) -> FailpointNotifier {
let mut config = config_arc.lock().unwrap();
config.notifiers.create_notifier(config_arc.clone())
fn create_failpoint_notifier(config: Arc<FailpointConfig>) -> FailpointNotifier {
let mut notifiers = config.notifiers.lock().unwrap();
notifiers.create_notifier(config.notifiers.clone())
}
/// Execute a specific action (used for recursive execution in probability-based actions)
@@ -337,7 +331,7 @@ fn execute_action(
action_spec: &FailpointActionSpec,
_context: Option<&FailpointContext>,
cancel_token: &CancellationToken,
config_arc: Arc<std::sync::Mutex<FailpointConfig>>,
config: Arc<FailpointConfig>,
) -> Either<FailpointResult, Pin<Box<dyn Future<Output = FailpointResult> + Send>>> {
match &action_spec.action {
FailpointAction::Off => Either::Left(FailpointResult::Continue),
@@ -362,7 +356,7 @@ fn execute_action(
tracing::info!("Failpoint {} sleeping for {}ms", name, millis);
// Create a notifier for this task
let notifier = create_failpoint_notifier(config_arc);
let notifier = create_failpoint_notifier(config);
// Sleep with cancellation support
tokio::select! {
@@ -389,7 +383,7 @@ fn execute_action(
tracing::info!("Failpoint {} pausing", name);
// Create a notifier for this task
let notifier = create_failpoint_notifier(config_arc);
let notifier = create_failpoint_notifier(config);
// Wait until cancelled or notified
tokio::select! {