diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 9e8536308c..5e8717a53a 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -24,7 +24,7 @@ use async_trait::async_trait; use backon::ExponentialBuilder; use common_error::ext::BoxedError; use common_event_recorder::EventRecorderRef; -use common_runtime::{RepeatedTask, TaskFunction}; +use common_runtime::{JoinHandle, RepeatedTask, TaskFunction}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{error, info, tracing}; use snafu::{OptionExt, ResultExt, ensure}; @@ -254,6 +254,8 @@ pub(crate) struct ManagerContext { running_procedures: Mutex>, /// Ids and finished time of finished procedures. finished_procedures: Mutex>, + /// Runner tasks of procedures. + runner_tasks: Mutex>>, /// Running flag. running: Arc, /// Poison manager. @@ -310,6 +312,7 @@ impl ManagerContext { procedures: RwLock::new(HashMap::new()), running_procedures: Mutex::new(HashSet::new()), finished_procedures: Mutex::new(VecDeque::new()), + runner_tasks: Mutex::new(HashMap::new()), running: Arc::new(AtomicBool::new(false)), poison_manager, } @@ -329,6 +332,76 @@ impl ManagerContext { self.running.store(false, Ordering::Relaxed); } + fn reset_runtime_state(&self) { + self.procedures.write().unwrap().clear(); + self.running_procedures.lock().unwrap().clear(); + self.finished_procedures.lock().unwrap().clear(); + for handle in self + .runner_tasks + .lock() + .unwrap() + .drain() + .map(|(_, handle)| handle) + { + handle.abort(); + } + self.key_lock.clear(); + self.dynamic_key_lock.clear(); + } + + fn spawn_runner_task(&self, procedure_id: ProcedureId, spawn: F) -> bool + where + F: FnOnce() -> JoinHandle<()>, + { + let mut tasks = self.runner_tasks.lock().unwrap(); + if !self.running() { + return false; + } + + let handle = spawn(); + let _ = tasks.insert(procedure_id, handle); + true + } + + fn remove_procedure(&self, procedure_id: ProcedureId) { + self.procedures.write().unwrap().remove(&procedure_id); + self.running_procedures + .lock() + .unwrap() + .remove(&procedure_id); + } + + pub(crate) fn remove_runner_task(&self, procedure_id: ProcedureId) { + let _ = self.runner_tasks.lock().unwrap().remove(&procedure_id); + } + + fn take_runner_tasks(&self) -> Vec> { + self.runner_tasks + .lock() + .unwrap() + .drain() + .map(|(_, handle)| handle) + .collect() + } + + async fn abort_runner_tasks(&self) { + let handles = self.take_runner_tasks(); + + for handle in &handles { + handle.abort(); + } + + for handle in handles { + if let Err(e) = handle.await + && !e.is_cancelled() + { + error!( + e; "Procedure runner task exits unexpectedly during stop", + ); + } + } + } + /// Return `ProcedureManager` is running. pub(crate) fn running(&self) -> bool { self.running.load(Ordering::Relaxed) @@ -675,17 +748,25 @@ impl LocalManager { let tracing_context = TracingContext::from_current_span(); - let _handle = common_runtime::spawn_global(async move { - let span = tracing_context.attach(tracing::info_span!( - "LocalManager::submit_root_procedure", - procedure_name = %runner.meta.type_name, - procedure_id = %runner.meta.id, - )); - // Run the root procedure. - // The task was moved to another runtime for execution. - // In order not to interrupt tracing, a span needs to be created to continue tracing the current task. - runner.run().trace(span).await; - }); + ensure!( + self.manager_ctx.spawn_runner_task(procedure_id, || { + common_runtime::spawn_global(async move { + let span = tracing_context.attach(tracing::info_span!( + "LocalManager::submit_root_procedure", + procedure_name = %runner.meta.type_name, + procedure_id = %runner.meta.id, + )); + // Run the root procedure. + // The task was moved to another runtime for execution. + // In order not to interrupt tracing, a span needs to be created to continue tracing the current task. + runner.run().trace(span).await; + }) + }), + { + self.manager_ctx.remove_procedure(procedure_id); + ManagerNotStartSnafu + } + ); Ok(watcher) } @@ -822,6 +903,7 @@ impl ProcedureManager for LocalManager { *task = Some(task_inner); + self.manager_ctx.reset_runtime_state(); self.manager_ctx.start(); info!("LocalManager is start."); @@ -830,14 +912,18 @@ impl ProcedureManager for LocalManager { } async fn stop(&self) -> Result<()> { - let mut task = self.remove_outdated_meta_task.lock().await; - - if let Some(task) = task.take() { - task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?; - } - self.manager_ctx.stop(); + let mut task = self.remove_outdated_meta_task.lock().await; + if let Some(task) = task.take() + && let Err(e) = task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu) + { + error!(e; "Failed to stop remove outdated meta task"); + }; + + self.manager_ctx.abort_runner_tasks().await; + self.manager_ctx.reset_runtime_state(); + info!("LocalManager is stopped."); Ok(()) @@ -921,10 +1007,12 @@ pub(crate) mod test_util { #[cfg(test)] mod tests { use std::assert_matches; + use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering}; use common_error::mock::MockError; use common_error::status_code::StatusCode; use common_test_util::temp_dir::create_temp_dir; + use tokio::sync::oneshot; use tokio::time::timeout; use super::*; @@ -954,6 +1042,67 @@ mod tests { assert!(ctx.state(meta.id).unwrap().is_done()); } + #[test] + fn test_reset_runtime_state() { + let ctx = new_test_manager_context(); + ctx.set_running(); + let mut meta = test_util::procedure_meta_for_test(); + meta.lock_key = LockKey::single_exclusive("test.reset_runtime_state"); + let meta = Arc::new(meta); + let procedure_id = meta.id; + + assert!(ctx.try_insert_procedure(meta.clone())); + ctx.finished_procedures + .lock() + .unwrap() + .push_back((procedure_id, Instant::now())); + ctx.spawn_runner_task(procedure_id, || { + common_runtime::spawn_global(std::future::pending::<()>()) + }); + + drop( + ctx.key_lock + .try_write("test.reset_runtime_state".to_string()), + ); + drop( + ctx.dynamic_key_lock + .try_write("test.reset_runtime_state.dynamic".to_string()), + ); + assert!(ctx.contains_procedure(procedure_id)); + assert_eq!(1, ctx.running_procedures.lock().unwrap().len()); + assert_eq!(1, ctx.finished_procedures.lock().unwrap().len()); + assert_eq!(1, ctx.runner_tasks.lock().unwrap().len()); + assert_eq!(1, ctx.key_lock.len()); + assert_eq!(1, ctx.dynamic_key_lock.len()); + + ctx.reset_runtime_state(); + + assert!(!ctx.contains_procedure(procedure_id)); + assert!(ctx.running_procedures.lock().unwrap().is_empty()); + assert!(ctx.finished_procedures.lock().unwrap().is_empty()); + assert!(ctx.runner_tasks.lock().unwrap().is_empty()); + assert!(ctx.key_lock.is_empty()); + assert!(ctx.dynamic_key_lock.is_empty()); + } + + #[test] + fn test_spawn_runner_task_not_started_after_stop() { + let ctx = new_test_manager_context(); + let procedure_id = ProcedureId::random(); + + let spawned = Arc::new(AtomicBool::new(false)); + let spawned_in_task = spawned.clone(); + let started = ctx.spawn_runner_task(procedure_id, || { + common_runtime::spawn_global(async move { + spawned_in_task.store(true, AtomicOrdering::Relaxed); + }) + }); + + assert!(!started); + assert!(!spawned.load(AtomicOrdering::Relaxed)); + assert!(ctx.runner_tasks.lock().unwrap().is_empty()); + } + #[test] fn test_manager_context_insert_duplicate() { let ctx = new_test_manager_context(); @@ -1046,6 +1195,105 @@ mod tests { } } + #[derive(Debug)] + struct BlockingProcedure { + started_tx: Option>, + dropped: Arc, + lock_key: LockKey, + } + + impl Drop for BlockingProcedure { + fn drop(&mut self) { + self.dropped.store(true, AtomicOrdering::Relaxed); + } + } + + #[async_trait] + impl Procedure for BlockingProcedure { + fn type_name(&self) -> &str { + "BlockingProcedure" + } + + async fn execute(&mut self, _ctx: &Context) -> Result { + if let Some(tx) = self.started_tx.take() { + let _ = tx.send(()); + } + std::future::pending::>().await + } + + fn dump(&self) -> Result { + Ok(String::new()) + } + + fn lock_key(&self) -> LockKey { + self.lock_key.clone() + } + } + + #[tokio::test] + async fn test_stop_aborts_runner_and_resets_runtime_state() { + let dir = create_temp_dir("stop_aborts_runner_and_resets_runtime_state"); + let config = ManagerConfig::default(); + let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager, None, None); + manager.start().await.unwrap(); + + let procedure_id = ProcedureId::random(); + let (started_tx, started_rx) = oneshot::channel(); + let dropped = Arc::new(AtomicBool::new(false)); + let procedure = BlockingProcedure { + started_tx: Some(started_tx), + dropped: dropped.clone(), + lock_key: LockKey::single_exclusive("test.stop_aborts_runner"), + }; + + manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .unwrap(); + timeout(Duration::from_secs(5), started_rx) + .await + .unwrap() + .unwrap(); + + assert!(manager.manager_ctx.contains_procedure(procedure_id)); + assert_eq!( + 1, + manager.manager_ctx.running_procedures.lock().unwrap().len() + ); + assert_eq!(1, manager.manager_ctx.runner_tasks.lock().unwrap().len()); + assert_eq!(1, manager.manager_ctx.key_lock.len()); + + manager.stop().await.unwrap(); + + assert!(dropped.load(AtomicOrdering::Relaxed)); + assert!(!manager.manager_ctx.running()); + assert!(!manager.manager_ctx.contains_procedure(procedure_id)); + assert!( + manager + .manager_ctx + .running_procedures + .lock() + .unwrap() + .is_empty() + ); + assert!( + manager + .manager_ctx + .finished_procedures + .lock() + .unwrap() + .is_empty() + ); + assert!(manager.manager_ctx.runner_tasks.lock().unwrap().is_empty()); + assert!(manager.manager_ctx.key_lock.is_empty()); + assert!(manager.manager_ctx.dynamic_key_lock.is_empty()); + } + #[test] fn test_register_loader() { let dir = create_temp_dir("register"); @@ -1439,7 +1687,7 @@ mod tests { let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); let poison_manager = Arc::new(InMemoryPoisonStore::new()); let manager = LocalManager::new(config, state_store, poison_manager, None, None); - manager.manager_ctx.set_running(); + manager.start().await.unwrap(); manager .manager_ctx @@ -1447,7 +1695,6 @@ mod tests { .lock() .unwrap() .insert(ProcedureId::random()); - manager.start().await.unwrap(); // Submit a new procedure should fail. let mut procedure = ProcedureToLoad::new("submit"); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index ca3e221f43..509b3a7756 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -20,6 +20,7 @@ use backon::{BackoffBuilder, ExponentialBuilder}; use common_error::ext::PlainError; use common_error::status_code::StatusCode; use common_event_recorder::EventRecorderRef; +use common_telemetry::tracing::warn; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, error, info, tracing}; use rand::Rng; @@ -480,6 +481,15 @@ impl Runner { procedure_state: ProcedureState, procedure: BoxedProcedure, ) { + if !self.running() { + warn!( + "ProcedureManager is not running, skip submitting subprocedure {}-{}", + procedure.type_name(), + procedure_id + ); + return; + } + if self.manager_ctx.contains_procedure(procedure_id) { // If the parent has already submitted this procedure, don't submit it again. return; @@ -520,23 +530,29 @@ impl Runner { procedure_id, ); - // Add the id of the subprocedure to the metadata. - self.meta.push_child(procedure_id); let parent_id = self.meta.id; let tracing_context = TracingContext::from_current_span(); - let _handle = common_runtime::spawn_global(async move { - let span = tracing_context.attach(tracing::info_span!( - "LocalManager::submit_subprocedure", - procedure_name = %runner.meta.type_name, - procedure_id = %runner.meta.id, - parent_id = %parent_id, - )); - // Run the root procedure. - // The task was moved to another runtime for execution. - // In order not to interrupt tracing, a span needs to be created to continue tracing the current task. - runner.run().trace(span).await - }); + if !self.manager_ctx.spawn_runner_task(procedure_id, || { + common_runtime::spawn_global(async move { + let span = tracing_context.attach(tracing::info_span!( + "LocalManager::submit_subprocedure", + procedure_name = %runner.meta.type_name, + procedure_id = %runner.meta.id, + parent_id = %parent_id, + )); + // Run the root procedure. + // The task was moved to another runtime for execution. + // In order not to interrupt tracing, a span needs to be created to continue tracing the current task. + runner.run().trace(span).await + }) + }) { + self.manager_ctx.remove_procedure(procedure_id); + return; + } + + // Add the id of the subprocedure to the metadata. + self.meta.push_child(procedure_id); } /// Extend the retry time to wait for the next retry. @@ -702,6 +718,12 @@ impl Runner { } } +impl Drop for Runner { + fn drop(&mut self) { + self.manager_ctx.remove_runner_task(self.meta.id); + } +} + #[cfg(test)] mod tests { use std::assert_matches; diff --git a/src/common/procedure/src/rwlock.rs b/src/common/procedure/src/rwlock.rs index cbdfe30977..c4807cf2f7 100644 --- a/src/common/procedure/src/rwlock.rs +++ b/src/common/procedure/src/rwlock.rs @@ -106,6 +106,13 @@ where locks.remove(key); } } + + /// Clears all key locks. + /// + /// Callers must ensure no tasks are holding or waiting for these locks. + pub fn clear(&self) { + self.inner.lock().unwrap().clear(); + } } #[cfg(test)]