less lock contention

This commit is contained in:
Christian Schwarz
2025-07-11 17:55:13 +00:00
parent 78ad89b4d5
commit 8662463b06

View File

@@ -26,7 +26,7 @@ pub use either; // re-export for use in macros
/// Global failpoint registry
// TODO: switch to simple_rcu, but it's in `utils`
static FAILPOINTS: Lazy<Arc<std::sync::RwLock<HashMap<String, FailpointConfig>>>> =
static FAILPOINTS: Lazy<Arc<std::sync::RwLock<HashMap<String, Arc<std::sync::Mutex<FailpointConfig>>>>>> =
Lazy::new(|| Default::default());
/// Configuration for a single failpoint
@@ -191,10 +191,10 @@ pub fn configure_failpoint(name: &str, actions: &str) -> Result<()> {
// If this failpoint already exists, notify all waiting tasks
if let Some(existing_config) = failpoints.get(name) {
// Notify all waiting tasks about the configuration change
existing_config.notifiers.notify_all();
existing_config.lock().unwrap().notifiers.notify_all();
}
failpoints.insert(name.to_string(), config);
failpoints.insert(name.to_string(), Arc::new(std::sync::Mutex::new(config)));
tracing::info!("Configured failpoint: {} = {}", name, actions);
Ok(())
@@ -219,10 +219,10 @@ pub fn configure_failpoint_with_context(
// If this failpoint already exists, notify all waiting tasks
if let Some(existing_config) = failpoints.get(name) {
// Notify all waiting tasks about the configuration change
existing_config.notifiers.notify_all();
existing_config.lock().unwrap().notifiers.notify_all();
}
failpoints.insert(name.to_string(), config);
failpoints.insert(name.to_string(), Arc::new(std::sync::Mutex::new(config)));
tracing::info!("Configured failpoint with context: {} = {}", name, actions);
Ok(())
@@ -234,7 +234,7 @@ pub fn remove_failpoint(name: &str) {
// Notify all waiting tasks before removing
if let Some(existing_config) = failpoints.get(name) {
existing_config.notifiers.notify_all();
existing_config.lock().unwrap().notifiers.notify_all();
}
failpoints.remove(name);
@@ -252,7 +252,7 @@ pub fn list() -> Vec<(impl std::fmt::Display, impl std::fmt::Display)> {
.read()
.unwrap()
.iter()
.map(|(name, config)| (name.clone(), format!("{config:?}")))
.map(|(name, config)| (name.clone(), format!("{:?}", config.lock().unwrap())))
.collect::<Vec<_>>()
}
@@ -275,12 +275,19 @@ pub fn failpoint_with_cancellation(
return Either::Left(FailpointResult::Continue);
}
// Check if the failpoint exists and get the necessary info
let (action_spec, context_matchers) = {
// Get a clone of the failpoint config Arc - this minimizes the time we hold the global lock
let config_arc = {
let failpoints = FAILPOINTS.read().unwrap();
let Some(config) = failpoints.get(name) else {
return Either::Left(FailpointResult::Continue);
};
config.clone()
};
// Global lock is dropped here
// Now work with the individual config's lock
let (action_spec, context_matchers) = {
let config = config_arc.lock().unwrap();
(config.action_spec.clone(), config.context_matchers.clone())
};
@@ -297,11 +304,8 @@ pub fn failpoint_with_cancellation(
if let Some(max_count) = action_spec.max_count {
// Get the current trigger count
let trigger_count = {
let failpoints = FAILPOINTS.read().unwrap();
failpoints
.get(name)
.map(|config| config.trigger_count)
.unwrap_or(0)
let config = config_arc.lock().unwrap();
config.trigger_count
};
if trigger_count >= max_count {
@@ -318,34 +322,25 @@ pub fn failpoint_with_cancellation(
// Increment trigger count
{
let mut failpoints = FAILPOINTS.write().unwrap();
if let Some(fp_config) = failpoints.get_mut(name) {
fp_config.trigger_count += 1;
}
let mut config = config_arc.lock().unwrap();
config.trigger_count += 1;
}
}
tracing::info!("Hit failpoint: {}", name);
execute_action(name, &action_spec, context, cancel_token)
execute_action(name, &action_spec, context, cancel_token, config_arc)
}
/// Create a notifier for a failpoint
fn create_failpoint_notifier(name: &str) -> FailpointNotifier {
let mut failpoints = FAILPOINTS.write().unwrap();
if let Some(fp_config) = failpoints.get_mut(name) {
let cleanup_name = name.to_string();
fp_config.notifiers.create_notifier(move |notifier| {
let mut failpoints = FAILPOINTS.write().unwrap();
if let Some(fp_config) = failpoints.get_mut(&cleanup_name) {
fp_config.notifiers.remove_notifier(notifier);
}
})
} else {
// Failpoint doesn't exist, create a dummy notifier
let notifier = Arc::new(Notify::new());
FailpointNotifier::new(notifier, |_| {})
}
fn create_failpoint_notifier(config_arc: Arc<std::sync::Mutex<FailpointConfig>>) -> FailpointNotifier {
let cleanup_config_arc = config_arc.clone();
let mut config = config_arc.lock().unwrap();
config.notifiers.create_notifier(move |notifier| {
let mut config = cleanup_config_arc.lock().unwrap();
config.notifiers.remove_notifier(notifier);
})
}
/// Execute a specific action (used for recursive execution in probability-based actions)
@@ -354,6 +349,7 @@ fn execute_action(
action_spec: &FailpointActionSpec,
_context: Option<&FailpointContext>,
cancel_token: &CancellationToken,
config_arc: Arc<std::sync::Mutex<FailpointConfig>>,
) -> Either<FailpointResult, Pin<Box<dyn Future<Output = FailpointResult> + Send>>> {
match &action_spec.action {
FailpointAction::Off => Either::Left(FailpointResult::Continue),
@@ -378,7 +374,7 @@ fn execute_action(
tracing::info!("Failpoint {} sleeping for {}ms", name, millis);
// Create a notifier for this task
let notifier = create_failpoint_notifier(&name);
let notifier = create_failpoint_notifier(config_arc);
// Sleep with cancellation support
tokio::select! {
@@ -405,7 +401,7 @@ fn execute_action(
tracing::info!("Failpoint {} pausing", name);
// Create a notifier for this task
let notifier = create_failpoint_notifier(&name);
let notifier = create_failpoint_notifier(config_arc);
// Wait until cancelled or notified
tokio::select! {