mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-31 12:20:38 +00:00
feat(procedure): detect potential deadlock when parent/child procedures share lock keys (#7752)
* feat(procedure): detect potential deadlock when parent/child share lock keys Add a deadlock detection mechanism in submit_subprocedure() to warn when a child procedure's lock_key overlaps with its parent's lock_key. When this happens, the parent holds the lock while waiting for the child to complete (at child_notify.notified().await), but the child blocks forever trying to acquire the same lock. This is a classic Hold-and-Wait deadlock. The detection: - Emits a warn! log in all builds (visible in production) - Triggers debug_assert!(false) in debug/test builds for early CI detection This partially addresses the TODO at line 121-122 and is a follow-up to the discussion in: https://github.com/GreptimeTeam/greptimedb/issues/7692 Signed-off-by: YZL0v3ZZ <2055877225@qq.com> * style: fix trailing whitespace Signed-off-by: YZL0v3ZZ <2055877225@qq.com> * refactor(procedure): extract deadlock detection into a testable pure function Signed-off-by: YZL0v3ZZ <2055877225@qq.com> * fix(procedure): preserve lock mode when detecting parent/child deadlock Signed-off-by: YZL0v3ZZ <2055877225@qq.com> * re-run ci check Signed-off-by: YZL0v3ZZ <2055877225@qq.com> --------- Signed-off-by: YZL0v3ZZ <2055877225@qq.com>
This commit is contained in:
@@ -17,6 +17,8 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use backon::{BackoffBuilder, ExponentialBuilder};
|
||||
use common_error::ext::PlainError;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_event_recorder::EventRecorderRef;
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
use common_telemetry::{debug, error, info, tracing};
|
||||
@@ -90,6 +92,45 @@ impl Drop for ProcedureGuard {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a list of conflicting lock keys between a parent and a child procedure.
|
||||
/// Evaluates the Read/Write lock compatibility matrix:
|
||||
/// - Share + Share => Compatible
|
||||
/// - Exclusive + Any => Conflict
|
||||
/// - Any + Exclusive => Conflict
|
||||
fn find_lock_conflicts<'a>(
|
||||
parent_keys: impl Iterator<Item = &'a StringKey>,
|
||||
child_keys: impl Iterator<Item = &'a StringKey>,
|
||||
) -> Vec<String> {
|
||||
use std::collections::HashMap;
|
||||
|
||||
// Map from key string slice (&str) to a boolean indicating if the parent holds it EXCLUSIVELY.
|
||||
let mut parent_map = HashMap::new();
|
||||
for key in parent_keys {
|
||||
match key {
|
||||
StringKey::Exclusive(k) => {
|
||||
parent_map.insert(k.as_str(), true);
|
||||
}
|
||||
StringKey::Share(k) => {
|
||||
parent_map.entry(k.as_str()).or_insert(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
child_keys
|
||||
.filter_map(|child_key| match child_key {
|
||||
StringKey::Exclusive(k) | StringKey::Share(k)
|
||||
if parent_map.get(k.as_str()) == Some(&true) =>
|
||||
{
|
||||
Some(k.clone())
|
||||
}
|
||||
StringKey::Exclusive(k) if parent_map.get(k.as_str()) == Some(&false) => {
|
||||
Some(k.clone())
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) struct Runner {
|
||||
pub(crate) meta: ProcedureMetaRef,
|
||||
pub(crate) procedure: BoxedProcedure,
|
||||
@@ -512,6 +553,41 @@ impl Runner {
|
||||
|
||||
async fn on_suspended(&mut self, subprocedures: Vec<ProcedureWithId>) {
|
||||
let has_child = !subprocedures.is_empty();
|
||||
|
||||
// Pre-check: detect potential deadlocks BEFORE submitting any subprocedure.
|
||||
// If a child shares conflicting lock keys with the parent, submitting it would
|
||||
// cause a Hold-and-Wait deadlock — the child blocks on lock acquisition while
|
||||
// the parent holds the lock and waits for the child to finish.
|
||||
for sub in &subprocedures {
|
||||
let conflicting = find_lock_conflicts(
|
||||
self.meta.lock_key.keys_to_lock(),
|
||||
sub.procedure.lock_key().keys_to_lock(),
|
||||
);
|
||||
if !conflicting.is_empty() {
|
||||
let err_msg = format!(
|
||||
"Deadlock prevented: subprocedure {}-{} shares conflicting lock key(s) {:?} \
|
||||
with parent {}-{}. Parent holds these locks and would wait for child \
|
||||
completion, but child cannot acquire them.",
|
||||
sub.procedure.type_name(),
|
||||
sub.id,
|
||||
conflicting,
|
||||
self.procedure.type_name(),
|
||||
self.meta.id,
|
||||
);
|
||||
error!("{}", err_msg);
|
||||
let err = Arc::new(Error::external(PlainError::new(
|
||||
err_msg,
|
||||
StatusCode::Internal,
|
||||
)));
|
||||
if self.procedure.rollback_supported() {
|
||||
self.meta.set_state(ProcedureState::prepare_rollback(err));
|
||||
} else {
|
||||
self.meta.set_state(ProcedureState::failed(err));
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for subprocedure in subprocedures {
|
||||
info!(
|
||||
"Procedure {}-{} submit subprocedure {}-{}",
|
||||
@@ -1939,4 +2015,169 @@ mod tests {
|
||||
join_all(tasks).await;
|
||||
assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_on_suspend_deadlock_detected_no_rollback() {
|
||||
// Parent holds Exclusive("catalog.schema.table"), child also requests Exclusive("catalog.schema.table").
|
||||
// Since parent does NOT support rollback, state should become Failed.
|
||||
let child_id = ProcedureId::random();
|
||||
let exec_fn = move |_| {
|
||||
async move {
|
||||
let child_exec_fn = |_| async { Ok(Status::done()) }.boxed();
|
||||
let child = ProcedureAdapter {
|
||||
data: "child".to_string(),
|
||||
lock_key: LockKey::single_exclusive("catalog.schema.table"),
|
||||
poison_keys: PoisonKeys::default(),
|
||||
exec_fn: child_exec_fn,
|
||||
rollback_fn: None,
|
||||
};
|
||||
Ok(Status::Suspended {
|
||||
subprocedures: vec![ProcedureWithId {
|
||||
id: child_id,
|
||||
procedure: Box::new(child),
|
||||
}],
|
||||
persist: false,
|
||||
})
|
||||
}
|
||||
.boxed()
|
||||
};
|
||||
let parent = ProcedureAdapter {
|
||||
data: "parent".to_string(),
|
||||
lock_key: LockKey::single_exclusive("catalog.schema.table"),
|
||||
poison_keys: PoisonKeys::default(),
|
||||
exec_fn,
|
||||
rollback_fn: None, // No rollback support
|
||||
};
|
||||
|
||||
let dir = create_temp_dir("deadlock_no_rollback");
|
||||
let meta = parent.new_meta(ROOT_ID);
|
||||
let ctx = context_without_provider(meta.id);
|
||||
let object_store = test_util::new_object_store(&dir);
|
||||
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
|
||||
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
|
||||
runner.manager_ctx.start();
|
||||
|
||||
runner.execute_once(&ctx).await;
|
||||
let state = runner.meta.state();
|
||||
assert!(state.is_failed(), "Expected Failed, got {state:?}");
|
||||
// Verify the error exists
|
||||
assert!(
|
||||
state.error().is_some(),
|
||||
"Failed state should contain an error"
|
||||
);
|
||||
// Child should NOT have been submitted
|
||||
assert!(
|
||||
!runner.manager_ctx.contains_procedure(child_id),
|
||||
"Child procedure should not be submitted when deadlock is detected"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_suspend_deadlock_detected_with_rollback() {
|
||||
// Parent holds Exclusive("catalog.schema.table"), child also requests Exclusive("catalog.schema.table").
|
||||
// Since parent DOES support rollback, state should become PrepareRollback.
|
||||
let child_id = ProcedureId::random();
|
||||
let exec_fn = move |_| {
|
||||
async move {
|
||||
let child_exec_fn = |_| async { Ok(Status::done()) }.boxed();
|
||||
let child = ProcedureAdapter {
|
||||
data: "child".to_string(),
|
||||
lock_key: LockKey::single_exclusive("catalog.schema.table"),
|
||||
poison_keys: PoisonKeys::default(),
|
||||
exec_fn: child_exec_fn,
|
||||
rollback_fn: None,
|
||||
};
|
||||
Ok(Status::Suspended {
|
||||
subprocedures: vec![ProcedureWithId {
|
||||
id: child_id,
|
||||
procedure: Box::new(child),
|
||||
}],
|
||||
persist: false,
|
||||
})
|
||||
}
|
||||
.boxed()
|
||||
};
|
||||
let rollback_fn = move |_| async move { Ok(()) }.boxed();
|
||||
let parent = ProcedureAdapter {
|
||||
data: "parent".to_string(),
|
||||
lock_key: LockKey::single_exclusive("catalog.schema.table"),
|
||||
poison_keys: PoisonKeys::default(),
|
||||
exec_fn,
|
||||
rollback_fn: Some(Box::new(rollback_fn)), // Supports rollback
|
||||
};
|
||||
|
||||
let dir = create_temp_dir("deadlock_with_rollback");
|
||||
let meta = parent.new_meta(ROOT_ID);
|
||||
let ctx = context_without_provider(meta.id);
|
||||
let object_store = test_util::new_object_store(&dir);
|
||||
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
|
||||
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
|
||||
runner.manager_ctx.start();
|
||||
|
||||
runner.execute_once(&ctx).await;
|
||||
let state = runner.meta.state();
|
||||
assert!(
|
||||
state.is_prepare_rollback(),
|
||||
"Expected PrepareRollback, got {state:?}"
|
||||
);
|
||||
// Verify the error exists in PrepareRollback variant
|
||||
match &state {
|
||||
ProcedureState::PrepareRollback { error } => {
|
||||
assert!(!error.to_string().is_empty(), "Error should not be empty");
|
||||
}
|
||||
_ => panic!("Expected PrepareRollback, got {state:?}"),
|
||||
}
|
||||
// Child should NOT have been submitted
|
||||
assert!(
|
||||
!runner.manager_ctx.contains_procedure(child_id),
|
||||
"Child procedure should not be submitted when deadlock is detected"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_lock_conflicts() {
|
||||
use crate::procedure::StringKey;
|
||||
|
||||
// 1. Share + Share = No conflict (Compatible)
|
||||
let parent = [StringKey::Share("A".to_string())];
|
||||
let child = [StringKey::Share("A".to_string())];
|
||||
assert!(super::find_lock_conflicts(parent.iter(), child.iter()).is_empty());
|
||||
|
||||
// 2. Share + Exclusive = Conflict
|
||||
let parent = [StringKey::Share("A".to_string())];
|
||||
let child = [StringKey::Exclusive("A".to_string())];
|
||||
assert_eq!(
|
||||
super::find_lock_conflicts(parent.iter(), child.iter()),
|
||||
vec!["A".to_string()]
|
||||
);
|
||||
|
||||
// 3. Exclusive + Share = Conflict
|
||||
let parent = [StringKey::Exclusive("A".to_string())];
|
||||
let child = [StringKey::Share("A".to_string())];
|
||||
assert_eq!(
|
||||
super::find_lock_conflicts(parent.iter(), child.iter()),
|
||||
vec!["A".to_string()]
|
||||
);
|
||||
|
||||
// 4. Exclusive + Exclusive = Conflict
|
||||
let parent = [StringKey::Exclusive("A".to_string())];
|
||||
let child = [StringKey::Exclusive("A".to_string())];
|
||||
assert_eq!(
|
||||
super::find_lock_conflicts(parent.iter(), child.iter()),
|
||||
vec!["A".to_string()]
|
||||
);
|
||||
|
||||
// 5. Multiple keys, partial overlap
|
||||
let parent = [
|
||||
StringKey::Share("A".to_string()),
|
||||
StringKey::Exclusive("B".to_string()),
|
||||
];
|
||||
let child = [
|
||||
StringKey::Exclusive("A".to_string()), // Conflict with Share("A")
|
||||
StringKey::Share("B".to_string()), // Conflict with Exclusive("B")
|
||||
StringKey::Exclusive("C".to_string()), // No conflict, parent doesn't hold C
|
||||
];
|
||||
let mut conflicts = super::find_lock_conflicts(parent.iter(), child.iter());
|
||||
conflicts.sort();
|
||||
assert_eq!(conflicts, vec!["A".to_string(), "B".to_string()]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user