diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 454afb95b3..46dcef11d4 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -17,6 +17,8 @@ use std::sync::Arc; use std::time::Duration; use backon::{BackoffBuilder, ExponentialBuilder}; +use common_error::ext::PlainError; +use common_error::status_code::StatusCode; use common_event_recorder::EventRecorderRef; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, error, info, tracing}; @@ -90,6 +92,45 @@ impl Drop for ProcedureGuard { } } +/// Returns a list of conflicting lock keys between a parent and a child procedure. +/// Evaluates the Read/Write lock compatibility matrix: +/// - Share + Share => Compatible +/// - Exclusive + Any => Conflict +/// - Any + Exclusive => Conflict +fn find_lock_conflicts<'a>( + parent_keys: impl Iterator, + child_keys: impl Iterator, +) -> Vec { + use std::collections::HashMap; + + // Map from key string slice (&str) to a boolean indicating if the parent holds it EXCLUSIVELY. + let mut parent_map = HashMap::new(); + for key in parent_keys { + match key { + StringKey::Exclusive(k) => { + parent_map.insert(k.as_str(), true); + } + StringKey::Share(k) => { + parent_map.entry(k.as_str()).or_insert(false); + } + } + } + + child_keys + .filter_map(|child_key| match child_key { + StringKey::Exclusive(k) | StringKey::Share(k) + if parent_map.get(k.as_str()) == Some(&true) => + { + Some(k.clone()) + } + StringKey::Exclusive(k) if parent_map.get(k.as_str()) == Some(&false) => { + Some(k.clone()) + } + _ => None, + }) + .collect() +} + pub(crate) struct Runner { pub(crate) meta: ProcedureMetaRef, pub(crate) procedure: BoxedProcedure, @@ -512,6 +553,41 @@ impl Runner { async fn on_suspended(&mut self, subprocedures: Vec) { let has_child = !subprocedures.is_empty(); + + // Pre-check: detect potential deadlocks BEFORE submitting any subprocedure. + // If a child shares conflicting lock keys with the parent, submitting it would + // cause a Hold-and-Wait deadlock — the child blocks on lock acquisition while + // the parent holds the lock and waits for the child to finish. + for sub in &subprocedures { + let conflicting = find_lock_conflicts( + self.meta.lock_key.keys_to_lock(), + sub.procedure.lock_key().keys_to_lock(), + ); + if !conflicting.is_empty() { + let err_msg = format!( + "Deadlock prevented: subprocedure {}-{} shares conflicting lock key(s) {:?} \ + with parent {}-{}. Parent holds these locks and would wait for child \ + completion, but child cannot acquire them.", + sub.procedure.type_name(), + sub.id, + conflicting, + self.procedure.type_name(), + self.meta.id, + ); + error!("{}", err_msg); + let err = Arc::new(Error::external(PlainError::new( + err_msg, + StatusCode::Internal, + ))); + if self.procedure.rollback_supported() { + self.meta.set_state(ProcedureState::prepare_rollback(err)); + } else { + self.meta.set_state(ProcedureState::failed(err)); + } + return; + } + } + for subprocedure in subprocedures { info!( "Procedure {}-{} submit subprocedure {}-{}", @@ -1939,4 +2015,169 @@ mod tests { join_all(tasks).await; assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2); } + #[tokio::test] + async fn test_on_suspend_deadlock_detected_no_rollback() { + // Parent holds Exclusive("catalog.schema.table"), child also requests Exclusive("catalog.schema.table"). + // Since parent does NOT support rollback, state should become Failed. + let child_id = ProcedureId::random(); + let exec_fn = move |_| { + async move { + let child_exec_fn = |_| async { Ok(Status::done()) }.boxed(); + let child = ProcedureAdapter { + data: "child".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), + exec_fn: child_exec_fn, + rollback_fn: None, + }; + Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: child_id, + procedure: Box::new(child), + }], + persist: false, + }) + } + .boxed() + }; + let parent = ProcedureAdapter { + data: "parent".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), + exec_fn, + rollback_fn: None, // No rollback support + }; + + let dir = create_temp_dir("deadlock_no_rollback"); + let meta = parent.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); + runner.manager_ctx.start(); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_failed(), "Expected Failed, got {state:?}"); + // Verify the error exists + assert!( + state.error().is_some(), + "Failed state should contain an error" + ); + // Child should NOT have been submitted + assert!( + !runner.manager_ctx.contains_procedure(child_id), + "Child procedure should not be submitted when deadlock is detected" + ); + } + + #[tokio::test] + async fn test_on_suspend_deadlock_detected_with_rollback() { + // Parent holds Exclusive("catalog.schema.table"), child also requests Exclusive("catalog.schema.table"). + // Since parent DOES support rollback, state should become PrepareRollback. + let child_id = ProcedureId::random(); + let exec_fn = move |_| { + async move { + let child_exec_fn = |_| async { Ok(Status::done()) }.boxed(); + let child = ProcedureAdapter { + data: "child".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), + exec_fn: child_exec_fn, + rollback_fn: None, + }; + Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: child_id, + procedure: Box::new(child), + }], + persist: false, + }) + } + .boxed() + }; + let rollback_fn = move |_| async move { Ok(()) }.boxed(); + let parent = ProcedureAdapter { + data: "parent".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), + exec_fn, + rollback_fn: Some(Box::new(rollback_fn)), // Supports rollback + }; + + let dir = create_temp_dir("deadlock_with_rollback"); + let meta = parent.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); + runner.manager_ctx.start(); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!( + state.is_prepare_rollback(), + "Expected PrepareRollback, got {state:?}" + ); + // Verify the error exists in PrepareRollback variant + match &state { + ProcedureState::PrepareRollback { error } => { + assert!(!error.to_string().is_empty(), "Error should not be empty"); + } + _ => panic!("Expected PrepareRollback, got {state:?}"), + } + // Child should NOT have been submitted + assert!( + !runner.manager_ctx.contains_procedure(child_id), + "Child procedure should not be submitted when deadlock is detected" + ); + } + + #[test] + fn test_find_lock_conflicts() { + use crate::procedure::StringKey; + + // 1. Share + Share = No conflict (Compatible) + let parent = [StringKey::Share("A".to_string())]; + let child = [StringKey::Share("A".to_string())]; + assert!(super::find_lock_conflicts(parent.iter(), child.iter()).is_empty()); + + // 2. Share + Exclusive = Conflict + let parent = [StringKey::Share("A".to_string())]; + let child = [StringKey::Exclusive("A".to_string())]; + assert_eq!( + super::find_lock_conflicts(parent.iter(), child.iter()), + vec!["A".to_string()] + ); + + // 3. Exclusive + Share = Conflict + let parent = [StringKey::Exclusive("A".to_string())]; + let child = [StringKey::Share("A".to_string())]; + assert_eq!( + super::find_lock_conflicts(parent.iter(), child.iter()), + vec!["A".to_string()] + ); + + // 4. Exclusive + Exclusive = Conflict + let parent = [StringKey::Exclusive("A".to_string())]; + let child = [StringKey::Exclusive("A".to_string())]; + assert_eq!( + super::find_lock_conflicts(parent.iter(), child.iter()), + vec!["A".to_string()] + ); + + // 5. Multiple keys, partial overlap + let parent = [ + StringKey::Share("A".to_string()), + StringKey::Exclusive("B".to_string()), + ]; + let child = [ + StringKey::Exclusive("A".to_string()), // Conflict with Share("A") + StringKey::Share("B".to_string()), // Conflict with Exclusive("B") + StringKey::Exclusive("C".to_string()), // No conflict, parent doesn't hold C + ]; + let mut conflicts = super::find_lock_conflicts(parent.iter(), child.iter()); + conflicts.sort(); + assert_eq!(conflicts, vec!["A".to_string(), "B".to_string()]); + } }