From 1d5291b06d08c98ac11e0d682a0d379961a00f5c Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 11 Dec 2025 10:30:32 +0800 Subject: [PATCH] fix(procedure): update procedure state correctly during execution and on failure (#7376) Signed-off-by: WenyXu --- src/common/procedure/src/local/runner.rs | 138 +++++++++++++++++++---- 1 file changed, 117 insertions(+), 21 deletions(-) diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 9135bee9b4..f1c70c78c3 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -331,8 +331,29 @@ impl Runner { } match status { - Status::Executing { .. } => {} + Status::Executing { .. } => { + let prev_state = self.meta.state(); + if !matches!(prev_state, ProcedureState::Running) { + info!( + "Set Procedure {}-{} state to running, prev_state: {:?}", + self.procedure.type_name(), + self.meta.id, + prev_state + ); + self.meta.set_state(ProcedureState::Running); + } + } Status::Suspended { subprocedures, .. } => { + let prev_state = self.meta.state(); + if !matches!(prev_state, ProcedureState::Running) { + info!( + "Set Procedure {}-{} state to running, prev_state: {:?}", + self.procedure.type_name(), + self.meta.id, + prev_state + ); + self.meta.set_state(ProcedureState::Running); + } self.on_suspended(subprocedures).await; } Status::Done { output } => { @@ -393,8 +414,12 @@ impl Runner { return; } - self.meta - .set_state(ProcedureState::prepare_rollback(Arc::new(e))); + if self.procedure.rollback_supported() { + self.meta + .set_state(ProcedureState::prepare_rollback(Arc::new(e))); + } else { + self.meta.set_state(ProcedureState::failed(Arc::new(e))); + } } } } @@ -1080,20 +1105,10 @@ mod tests { let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone()); runner.manager_ctx.start(); - runner.execute_once(&ctx).await; - let state = runner.meta.state(); - assert!(state.is_prepare_rollback(), "{state:?}"); - runner.execute_once(&ctx).await; let state = runner.meta.state(); assert!(state.is_failed(), "{state:?}"); - check_files( - &object_store, - &procedure_store, - ctx.procedure_id, - &["0000000000.rollback"], - ) - .await; + check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await; } #[tokio::test] @@ -1146,6 +1161,8 @@ mod tests { async move { if times == 1 { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) + } else if times == 2 { + Ok(Status::executing(false)) } else { Ok(Status::done()) } @@ -1172,6 +1189,10 @@ mod tests { let state = runner.meta.state(); assert!(state.is_retrying(), "{state:?}"); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); + runner.execute_once(&ctx).await; let state = runner.meta.state(); assert!(state.is_done(), "{state:?}"); @@ -1185,6 +1206,86 @@ mod tests { .await; } + #[tokio::test(flavor = "multi_thread")] + async fn test_execute_on_retry_later_error_with_child() { + common_telemetry::init_default_ut_logging(); + let mut times = 0; + let child_id = ProcedureId::random(); + + let exec_fn = move |_| { + times += 1; + async move { + debug!("times: {}", times); + if times == 1 { + Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) + } else if times == 2 { + let exec_fn = |_| { + async { Err(Error::external(MockError::new(StatusCode::Unexpected))) } + .boxed() + }; + let fail = ProcedureAdapter { + data: "fail".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"), + poison_keys: PoisonKeys::default(), + exec_fn, + rollback_fn: None, + }; + + Ok(Status::Suspended { + subprocedures: vec![ProcedureWithId { + id: child_id, + procedure: Box::new(fail), + }], + persist: true, + }) + } else { + Ok(Status::done()) + } + } + .boxed() + }; + + let retry_later = ProcedureAdapter { + data: "retry_later".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), + exec_fn, + rollback_fn: None, + }; + + let dir = create_temp_dir("retry_later"); + let meta = retry_later.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone()); + runner.manager_ctx.start(); + debug!("execute_once 1"); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_retrying(), "{state:?}"); + + let moved_meta = meta.clone(); + tokio::spawn(async move { + moved_meta.child_notify.notify_one(); + }); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_done(), "{state:?}"); + assert!(meta.state().is_done()); + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.step", "0000000001.commit"], + ) + .await; + } + #[tokio::test] async fn test_execute_exceed_max_retry_later() { let exec_fn = @@ -1304,7 +1405,7 @@ mod tests { async fn test_child_error() { let mut times = 0; let child_id = ProcedureId::random(); - + common_telemetry::init_default_ut_logging(); let exec_fn = move |ctx: Context| { times += 1; async move { @@ -1529,7 +1630,7 @@ mod tests { runner.execute_once(&ctx).await; let state = runner.meta.state(); - assert!(state.is_prepare_rollback(), "{state:?}"); + assert!(state.is_failed(), "{state:?}"); let procedure_id = runner .manager_ctx @@ -1596,11 +1697,6 @@ mod tests { let state = runner.meta.state(); assert!(state.is_running(), "{state:?}"); - runner.execute_once(&ctx).await; - let state = runner.meta.state(); - assert!(state.is_prepare_rollback(), "{state:?}"); - assert!(meta.state().is_prepare_rollback()); - runner.execute_once(&ctx).await; let state = runner.meta.state(); assert!(state.is_failed(), "{state:?}");