fix(procedure): update procedure state correctly during execution and on failure (#7376)

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-12-11 10:30:32 +08:00
committed by GitHub
parent 564cc0c750
commit 1d5291b06d

View File

@@ -331,8 +331,29 @@ impl Runner {
}
match status {
Status::Executing { .. } => {}
Status::Executing { .. } => {
let prev_state = self.meta.state();
if !matches!(prev_state, ProcedureState::Running) {
info!(
"Set Procedure {}-{} state to running, prev_state: {:?}",
self.procedure.type_name(),
self.meta.id,
prev_state
);
self.meta.set_state(ProcedureState::Running);
}
}
Status::Suspended { subprocedures, .. } => {
let prev_state = self.meta.state();
if !matches!(prev_state, ProcedureState::Running) {
info!(
"Set Procedure {}-{} state to running, prev_state: {:?}",
self.procedure.type_name(),
self.meta.id,
prev_state
);
self.meta.set_state(ProcedureState::Running);
}
self.on_suspended(subprocedures).await;
}
Status::Done { output } => {
@@ -393,8 +414,12 @@ impl Runner {
return;
}
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(e)));
if self.procedure.rollback_supported() {
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(e)));
} else {
self.meta.set_state(ProcedureState::failed(Arc::new(e)));
}
}
}
}
@@ -1080,20 +1105,10 @@ mod tests {
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.rollback"],
)
.await;
check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
}
#[tokio::test]
@@ -1146,6 +1161,8 @@ mod tests {
async move {
if times == 1 {
Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
} else if times == 2 {
Ok(Status::executing(false))
} else {
Ok(Status::done())
}
@@ -1172,6 +1189,10 @@ mod tests {
let state = runner.meta.state();
assert!(state.is_retrying(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_done(), "{state:?}");
@@ -1185,6 +1206,86 @@ mod tests {
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_on_retry_later_error_with_child() {
common_telemetry::init_default_ut_logging();
let mut times = 0;
let child_id = ProcedureId::random();
let exec_fn = move |_| {
times += 1;
async move {
debug!("times: {}", times);
if times == 1 {
Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
} else if times == 2 {
let exec_fn = |_| {
async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }
.boxed()
};
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
Ok(Status::Suspended {
subprocedures: vec![ProcedureWithId {
id: child_id,
procedure: Box::new(fail),
}],
persist: true,
})
} else {
Ok(Status::done())
}
}
.boxed()
};
let retry_later = ProcedureAdapter {
data: "retry_later".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("retry_later");
let meta = retry_later.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
runner.manager_ctx.start();
debug!("execute_once 1");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_retrying(), "{state:?}");
let moved_meta = meta.clone();
tokio::spawn(async move {
moved_meta.child_notify.notify_one();
});
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_done(), "{state:?}");
assert!(meta.state().is_done());
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.step", "0000000001.commit"],
)
.await;
}
#[tokio::test]
async fn test_execute_exceed_max_retry_later() {
let exec_fn =
@@ -1304,7 +1405,7 @@ mod tests {
async fn test_child_error() {
let mut times = 0;
let child_id = ProcedureId::random();
common_telemetry::init_default_ut_logging();
let exec_fn = move |ctx: Context| {
times += 1;
async move {
@@ -1529,7 +1630,7 @@ mod tests {
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
assert!(state.is_failed(), "{state:?}");
let procedure_id = runner
.manager_ctx
@@ -1596,11 +1697,6 @@ mod tests {
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
assert!(meta.state().is_prepare_rollback());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");