fix: reset procedure manager state on stop (#8174)

* fix: reset procedure manager state on stop

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-05-28 11:45:04 +08:00
committed by GitHub
parent 91ac84019b
commit 123524474d
3 changed files with 310 additions and 34 deletions

View File

@@ -24,7 +24,7 @@ use async_trait::async_trait;
use backon::ExponentialBuilder;
use common_error::ext::BoxedError;
use common_event_recorder::EventRecorderRef;
use common_runtime::{RepeatedTask, TaskFunction};
use common_runtime::{JoinHandle, RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{error, info, tracing};
use snafu::{OptionExt, ResultExt, ensure};
@@ -254,6 +254,8 @@ pub(crate) struct ManagerContext {
running_procedures: Mutex<HashSet<ProcedureId>>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
/// Runner tasks of procedures.
runner_tasks: Mutex<HashMap<ProcedureId, JoinHandle<()>>>,
/// Running flag.
running: Arc<AtomicBool>,
/// Poison manager.
@@ -310,6 +312,7 @@ impl ManagerContext {
procedures: RwLock::new(HashMap::new()),
running_procedures: Mutex::new(HashSet::new()),
finished_procedures: Mutex::new(VecDeque::new()),
runner_tasks: Mutex::new(HashMap::new()),
running: Arc::new(AtomicBool::new(false)),
poison_manager,
}
@@ -329,6 +332,76 @@ impl ManagerContext {
self.running.store(false, Ordering::Relaxed);
}
fn reset_runtime_state(&self) {
self.procedures.write().unwrap().clear();
self.running_procedures.lock().unwrap().clear();
self.finished_procedures.lock().unwrap().clear();
for handle in self
.runner_tasks
.lock()
.unwrap()
.drain()
.map(|(_, handle)| handle)
{
handle.abort();
}
self.key_lock.clear();
self.dynamic_key_lock.clear();
}
fn spawn_runner_task<F>(&self, procedure_id: ProcedureId, spawn: F) -> bool
where
F: FnOnce() -> JoinHandle<()>,
{
let mut tasks = self.runner_tasks.lock().unwrap();
if !self.running() {
return false;
}
let handle = spawn();
let _ = tasks.insert(procedure_id, handle);
true
}
fn remove_procedure(&self, procedure_id: ProcedureId) {
self.procedures.write().unwrap().remove(&procedure_id);
self.running_procedures
.lock()
.unwrap()
.remove(&procedure_id);
}
pub(crate) fn remove_runner_task(&self, procedure_id: ProcedureId) {
let _ = self.runner_tasks.lock().unwrap().remove(&procedure_id);
}
fn take_runner_tasks(&self) -> Vec<JoinHandle<()>> {
self.runner_tasks
.lock()
.unwrap()
.drain()
.map(|(_, handle)| handle)
.collect()
}
async fn abort_runner_tasks(&self) {
let handles = self.take_runner_tasks();
for handle in &handles {
handle.abort();
}
for handle in handles {
if let Err(e) = handle.await
&& !e.is_cancelled()
{
error!(
e; "Procedure runner task exits unexpectedly during stop",
);
}
}
}
/// Return `ProcedureManager` is running.
pub(crate) fn running(&self) -> bool {
self.running.load(Ordering::Relaxed)
@@ -675,17 +748,25 @@ impl LocalManager {
let tracing_context = TracingContext::from_current_span();
let _handle = common_runtime::spawn_global(async move {
let span = tracing_context.attach(tracing::info_span!(
"LocalManager::submit_root_procedure",
procedure_name = %runner.meta.type_name,
procedure_id = %runner.meta.id,
));
// Run the root procedure.
// The task was moved to another runtime for execution.
// In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
runner.run().trace(span).await;
});
ensure!(
self.manager_ctx.spawn_runner_task(procedure_id, || {
common_runtime::spawn_global(async move {
let span = tracing_context.attach(tracing::info_span!(
"LocalManager::submit_root_procedure",
procedure_name = %runner.meta.type_name,
procedure_id = %runner.meta.id,
));
// Run the root procedure.
// The task was moved to another runtime for execution.
// In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
runner.run().trace(span).await;
})
}),
{
self.manager_ctx.remove_procedure(procedure_id);
ManagerNotStartSnafu
}
);
Ok(watcher)
}
@@ -822,6 +903,7 @@ impl ProcedureManager for LocalManager {
*task = Some(task_inner);
self.manager_ctx.reset_runtime_state();
self.manager_ctx.start();
info!("LocalManager is start.");
@@ -830,14 +912,18 @@ impl ProcedureManager for LocalManager {
}
async fn stop(&self) -> Result<()> {
let mut task = self.remove_outdated_meta_task.lock().await;
if let Some(task) = task.take() {
task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)?;
}
self.manager_ctx.stop();
let mut task = self.remove_outdated_meta_task.lock().await;
if let Some(task) = task.take()
&& let Err(e) = task.stop().await.context(StopRemoveOutdatedMetaTaskSnafu)
{
error!(e; "Failed to stop remove outdated meta task");
};
self.manager_ctx.abort_runner_tasks().await;
self.manager_ctx.reset_runtime_state();
info!("LocalManager is stopped.");
Ok(())
@@ -921,10 +1007,12 @@ pub(crate) mod test_util {
#[cfg(test)]
mod tests {
use std::assert_matches;
use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use tokio::sync::oneshot;
use tokio::time::timeout;
use super::*;
@@ -954,6 +1042,67 @@ mod tests {
assert!(ctx.state(meta.id).unwrap().is_done());
}
#[test]
fn test_reset_runtime_state() {
let ctx = new_test_manager_context();
ctx.set_running();
let mut meta = test_util::procedure_meta_for_test();
meta.lock_key = LockKey::single_exclusive("test.reset_runtime_state");
let meta = Arc::new(meta);
let procedure_id = meta.id;
assert!(ctx.try_insert_procedure(meta.clone()));
ctx.finished_procedures
.lock()
.unwrap()
.push_back((procedure_id, Instant::now()));
ctx.spawn_runner_task(procedure_id, || {
common_runtime::spawn_global(std::future::pending::<()>())
});
drop(
ctx.key_lock
.try_write("test.reset_runtime_state".to_string()),
);
drop(
ctx.dynamic_key_lock
.try_write("test.reset_runtime_state.dynamic".to_string()),
);
assert!(ctx.contains_procedure(procedure_id));
assert_eq!(1, ctx.running_procedures.lock().unwrap().len());
assert_eq!(1, ctx.finished_procedures.lock().unwrap().len());
assert_eq!(1, ctx.runner_tasks.lock().unwrap().len());
assert_eq!(1, ctx.key_lock.len());
assert_eq!(1, ctx.dynamic_key_lock.len());
ctx.reset_runtime_state();
assert!(!ctx.contains_procedure(procedure_id));
assert!(ctx.running_procedures.lock().unwrap().is_empty());
assert!(ctx.finished_procedures.lock().unwrap().is_empty());
assert!(ctx.runner_tasks.lock().unwrap().is_empty());
assert!(ctx.key_lock.is_empty());
assert!(ctx.dynamic_key_lock.is_empty());
}
#[test]
fn test_spawn_runner_task_not_started_after_stop() {
let ctx = new_test_manager_context();
let procedure_id = ProcedureId::random();
let spawned = Arc::new(AtomicBool::new(false));
let spawned_in_task = spawned.clone();
let started = ctx.spawn_runner_task(procedure_id, || {
common_runtime::spawn_global(async move {
spawned_in_task.store(true, AtomicOrdering::Relaxed);
})
});
assert!(!started);
assert!(!spawned.load(AtomicOrdering::Relaxed));
assert!(ctx.runner_tasks.lock().unwrap().is_empty());
}
#[test]
fn test_manager_context_insert_duplicate() {
let ctx = new_test_manager_context();
@@ -1046,6 +1195,105 @@ mod tests {
}
}
#[derive(Debug)]
struct BlockingProcedure {
started_tx: Option<oneshot::Sender<()>>,
dropped: Arc<AtomicBool>,
lock_key: LockKey,
}
impl Drop for BlockingProcedure {
fn drop(&mut self) {
self.dropped.store(true, AtomicOrdering::Relaxed);
}
}
#[async_trait]
impl Procedure for BlockingProcedure {
fn type_name(&self) -> &str {
"BlockingProcedure"
}
async fn execute(&mut self, _ctx: &Context) -> Result<Status> {
if let Some(tx) = self.started_tx.take() {
let _ = tx.send(());
}
std::future::pending::<Result<Status>>().await
}
fn dump(&self) -> Result<String> {
Ok(String::new())
}
fn lock_key(&self) -> LockKey {
self.lock_key.clone()
}
}
#[tokio::test]
async fn test_stop_aborts_runner_and_resets_runtime_state() {
let dir = create_temp_dir("stop_aborts_runner_and_resets_runtime_state");
let config = ManagerConfig::default();
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.start().await.unwrap();
let procedure_id = ProcedureId::random();
let (started_tx, started_rx) = oneshot::channel();
let dropped = Arc::new(AtomicBool::new(false));
let procedure = BlockingProcedure {
started_tx: Some(started_tx),
dropped: dropped.clone(),
lock_key: LockKey::single_exclusive("test.stop_aborts_runner"),
};
manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap();
timeout(Duration::from_secs(5), started_rx)
.await
.unwrap()
.unwrap();
assert!(manager.manager_ctx.contains_procedure(procedure_id));
assert_eq!(
1,
manager.manager_ctx.running_procedures.lock().unwrap().len()
);
assert_eq!(1, manager.manager_ctx.runner_tasks.lock().unwrap().len());
assert_eq!(1, manager.manager_ctx.key_lock.len());
manager.stop().await.unwrap();
assert!(dropped.load(AtomicOrdering::Relaxed));
assert!(!manager.manager_ctx.running());
assert!(!manager.manager_ctx.contains_procedure(procedure_id));
assert!(
manager
.manager_ctx
.running_procedures
.lock()
.unwrap()
.is_empty()
);
assert!(
manager
.manager_ctx
.finished_procedures
.lock()
.unwrap()
.is_empty()
);
assert!(manager.manager_ctx.runner_tasks.lock().unwrap().is_empty());
assert!(manager.manager_ctx.key_lock.is_empty());
assert!(manager.manager_ctx.dynamic_key_lock.is_empty());
}
#[test]
fn test_register_loader() {
let dir = create_temp_dir("register");
@@ -1439,7 +1687,7 @@ mod tests {
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager, None, None);
manager.manager_ctx.set_running();
manager.start().await.unwrap();
manager
.manager_ctx
@@ -1447,7 +1695,6 @@ mod tests {
.lock()
.unwrap()
.insert(ProcedureId::random());
manager.start().await.unwrap();
// Submit a new procedure should fail.
let mut procedure = ProcedureToLoad::new("submit");

View File

@@ -20,6 +20,7 @@ use backon::{BackoffBuilder, ExponentialBuilder};
use common_error::ext::PlainError;
use common_error::status_code::StatusCode;
use common_event_recorder::EventRecorderRef;
use common_telemetry::tracing::warn;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, error, info, tracing};
use rand::Rng;
@@ -480,6 +481,15 @@ impl Runner {
procedure_state: ProcedureState,
procedure: BoxedProcedure,
) {
if !self.running() {
warn!(
"ProcedureManager is not running, skip submitting subprocedure {}-{}",
procedure.type_name(),
procedure_id
);
return;
}
if self.manager_ctx.contains_procedure(procedure_id) {
// If the parent has already submitted this procedure, don't submit it again.
return;
@@ -520,23 +530,29 @@ impl Runner {
procedure_id,
);
// Add the id of the subprocedure to the metadata.
self.meta.push_child(procedure_id);
let parent_id = self.meta.id;
let tracing_context = TracingContext::from_current_span();
let _handle = common_runtime::spawn_global(async move {
let span = tracing_context.attach(tracing::info_span!(
"LocalManager::submit_subprocedure",
procedure_name = %runner.meta.type_name,
procedure_id = %runner.meta.id,
parent_id = %parent_id,
));
// Run the root procedure.
// The task was moved to another runtime for execution.
// In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
runner.run().trace(span).await
});
if !self.manager_ctx.spawn_runner_task(procedure_id, || {
common_runtime::spawn_global(async move {
let span = tracing_context.attach(tracing::info_span!(
"LocalManager::submit_subprocedure",
procedure_name = %runner.meta.type_name,
procedure_id = %runner.meta.id,
parent_id = %parent_id,
));
// Run the root procedure.
// The task was moved to another runtime for execution.
// In order not to interrupt tracing, a span needs to be created to continue tracing the current task.
runner.run().trace(span).await
})
}) {
self.manager_ctx.remove_procedure(procedure_id);
return;
}
// Add the id of the subprocedure to the metadata.
self.meta.push_child(procedure_id);
}
/// Extend the retry time to wait for the next retry.
@@ -702,6 +718,12 @@ impl Runner {
}
}
impl Drop for Runner {
fn drop(&mut self) {
self.manager_ctx.remove_runner_task(self.meta.id);
}
}
#[cfg(test)]
mod tests {
use std::assert_matches;

View File

@@ -106,6 +106,13 @@ where
locks.remove(key);
}
}
/// Clears all key locks.
///
/// Callers must ensure no tasks are holding or waiting for these locks.
pub fn clear(&self) {
self.inner.lock().unwrap().clear();
}
}
#[cfg(test)]