feat(procedure): support to rollback (#3625)

* feat: add rollback method

* refactor: simplify the state control

* feat(procedure): support to rollback

* test: add tests for rollback

* feat: persist rollback procedure state

* feat: rollback procedure after restarting

* feat: add `CommitRollback`, `RollingBack` to ProcedureStateResponse

* chore: apply suggestions from CR

* feat: persist rollback error

* feat: add `is_support_rollback`

* chore: apply suggestions from CR

* chore: update greptime-proto

* chore: rename to `rollback_supported`

* chore: rename to `RollbackProcedureRecovered`
This commit is contained in:
Weny Xu
2024-04-08 19:23:23 +08:00
committed by GitHub
parent 9038e1b769
commit 32b9639d7c
9 changed files with 562 additions and 204 deletions

2
Cargo.lock generated
View File

@@ -3781,7 +3781,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1bd2398b686e5ac6c1eef6daf615867ce27f75c1#1bd2398b686e5ac6c1eef6daf615867ce27f75c1"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c00c73b76ed20a603e4d29fb96cb4e6ef987bba7#c00c73b76ed20a603e4d29fb96cb4e6ef987bba7"
dependencies = [
"prost 0.12.3",
"serde",

View File

@@ -104,7 +104,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1bd2398b686e5ac6c1eef6daf615867ce27f75c1" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c00c73b76ed20a603e4d29fb96cb4e6ef987bba7" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"

View File

@@ -56,6 +56,12 @@ pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStat
ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()),
ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.to_string()),
ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.to_string()),
ProcedureState::PrepareRollback { error } => {
(PbProcedureStatus::PrepareRollback, error.to_string())
}
ProcedureState::RollingBack { error } => {
(PbProcedureStatus::RollingBack, error.to_string())
}
};
PbProcedureStateResponse {

View File

@@ -104,12 +104,24 @@ pub enum Error {
location: Location,
},
#[snafu(display("Rollback Procedure recovered: {error}"))]
RollbackProcedureRecovered { error: String, location: Location },
#[snafu(display("Procedure retry exceeded max times, procedure_id: {}", procedure_id))]
RetryTimesExceeded {
source: Arc<Error>,
procedure_id: ProcedureId,
},
#[snafu(display(
"Procedure rollback exceeded max times, procedure_id: {}",
procedure_id
))]
RollbackTimesExceeded {
source: Arc<Error>,
procedure_id: ProcedureId,
},
#[snafu(display("Corrupted data, error: "))]
CorruptedData {
#[snafu(source)]
@@ -145,6 +157,9 @@ pub enum Error {
#[snafu(display("Unexpected: {err_msg}"))]
Unexpected { location: Location, err_msg: String },
#[snafu(display("Not support to rollback the procedure"))]
RollbackNotSupported { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -161,9 +176,12 @@ impl ErrorExt for Error {
| Error::DeleteState { .. }
| Error::FromJson { .. }
| Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }
| Error::RetryLater { .. }
| Error::WaitWatcher { .. }
| Error::ManagerNotStart { .. } => StatusCode::Internal,
| Error::ManagerNotStart { .. }
| Error::RollbackProcedureRecovered { .. }
| Error::RollbackNotSupported { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}

View File

@@ -31,11 +31,11 @@ use tokio::sync::{Mutex as TokioMutex, Notify};
use self::rwlock::KeyRwLock;
use crate::error::{
DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::BoxedProcedureLoader;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState};
use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
@@ -72,8 +72,13 @@ pub(crate) struct ProcedureMeta {
}
impl ProcedureMeta {
fn new(id: ProcedureId, parent_id: Option<ProcedureId>, lock_key: LockKey) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(ProcedureState::Running);
fn new(
id: ProcedureId,
procedure_state: ProcedureState,
parent_id: Option<ProcedureId>,
lock_key: LockKey,
) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(procedure_state);
ProcedureMeta {
id,
parent_id,
@@ -424,12 +429,18 @@ impl LocalManager {
fn submit_root(
&self,
procedure_id: ProcedureId,
procedure_state: ProcedureState,
step: u32,
procedure: BoxedProcedure,
) -> Result<Watcher> {
ensure!(self.manager_ctx.running(), ManagerNotStartSnafu);
let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key()));
let meta = Arc::new(ProcedureMeta::new(
procedure_id,
procedure_state,
None,
procedure.lock_key(),
));
let runner = Runner {
meta: meta.clone(),
procedure,
@@ -468,13 +479,11 @@ impl LocalManager {
Ok(watcher)
}
/// Recovers unfinished procedures and reruns them.
async fn recover(&self) -> Result<()> {
logging::info!("LocalManager start to recover");
let recover_start = Instant::now();
let (messages, finished_ids) = self.procedure_store.load_messages().await?;
fn submit_recovered_messages(
&self,
messages: HashMap<ProcedureId, ProcedureMessage>,
init_state: InitProcedureState,
) {
for (procedure_id, message) in &messages {
if message.parent_id.is_none() {
// This is the root procedure. We only submit the root procedure as it will
@@ -494,8 +503,21 @@ impl LocalManager {
loaded_procedure.step
);
let procedure_state = match init_state {
InitProcedureState::RollingBack => ProcedureState::RollingBack {
error: Arc::new(
error::RollbackProcedureRecoveredSnafu {
error: message.error.clone().unwrap_or("Unknown error".to_string()),
}
.build(),
),
},
InitProcedureState::Running => ProcedureState::Running,
};
if let Err(e) = self.submit_root(
*procedure_id,
procedure_state,
loaded_procedure.step,
loaded_procedure.procedure,
) {
@@ -503,6 +525,18 @@ impl LocalManager {
}
}
}
}
/// Recovers unfinished procedures and reruns them.
async fn recover(&self) -> Result<()> {
logging::info!("LocalManager start to recover");
let recover_start = Instant::now();
let (messages, rollback_messages, finished_ids) =
self.procedure_store.load_messages().await?;
// Submits recovered messages first.
self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack);
self.submit_recovered_messages(messages, InitProcedureState::Running);
if !finished_ids.is_empty() {
logging::info!(
@@ -587,7 +621,12 @@ impl ProcedureManager for LocalManager {
DuplicateProcedureSnafu { procedure_id }
);
self.submit_root(procedure.id, 0, procedure.procedure)
self.submit_root(
procedure.id,
ProcedureState::Running,
0,
procedure.procedure,
)
}
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
@@ -626,7 +665,12 @@ pub(crate) mod test_util {
use super::*;
pub(crate) fn procedure_meta_for_test() -> ProcedureMeta {
ProcedureMeta::new(ProcedureId::random(), None, LockKey::default())
ProcedureMeta::new(
ProcedureId::random(),
ProcedureState::Running,
None,
LockKey::default(),
)
}
pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore {
@@ -914,6 +958,14 @@ mod tests {
}
}
async fn rollback(&mut self, _: &Context) -> Result<()> {
Ok(())
}
fn rollback_supported(&self) -> bool {
true
}
fn dump(&self) -> Result<String> {
Ok(String::new())
}
@@ -923,24 +975,29 @@ mod tests {
}
}
let check_procedure = |procedure| {
async {
let procedure_id = ProcedureId::random();
let mut watcher = manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap();
// Wait for the notification.
watcher.changed().await.unwrap();
assert!(watcher.borrow().is_failed());
}
let check_procedure = |procedure| async {
let procedure_id = ProcedureId::random();
manager
.submit(ProcedureWithId {
id: procedure_id,
procedure: Box::new(procedure),
})
.await
.unwrap()
};
check_procedure(MockProcedure { panic: false }).await;
check_procedure(MockProcedure { panic: true }).await;
let mut watcher = check_procedure(MockProcedure { panic: false }).await;
// Wait for the notification.
watcher.changed().await.unwrap();
assert!(watcher.borrow().is_prepare_rollback());
watcher.changed().await.unwrap();
assert!(watcher.borrow().is_rolling_back());
watcher.changed().await.unwrap();
assert!(watcher.borrow().is_failed());
// The runner won't rollback a panicked procedure.
let mut watcher = check_procedure(MockProcedure { panic: true }).await;
watcher.changed().await.unwrap();
assert!(watcher.borrow().is_failed());
}
#[tokio::test]

View File

@@ -23,37 +23,9 @@ use super::rwlock::OwnedKeyRwLockGuard;
use crate::error::{self, ProcedurePanicSnafu, Result};
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::procedure::{Output, StringKey};
use crate::store::ProcedureStore;
use crate::ProcedureState::Retrying;
use crate::store::{ProcedureMessage, ProcedureStore};
use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status};
#[derive(Debug)]
enum ExecResult {
Continue,
Done,
RetryLater,
Failed,
}
#[cfg(test)]
impl ExecResult {
fn is_continue(&self) -> bool {
matches!(self, ExecResult::Continue)
}
fn is_done(&self) -> bool {
matches!(self, ExecResult::Done)
}
fn is_retry_later(&self) -> bool {
matches!(self, ExecResult::RetryLater)
}
fn is_failed(&self) -> bool {
matches!(self, ExecResult::Failed)
}
}
/// A guard to cleanup procedure state.
struct ProcedureGuard {
meta: ProcedureMetaRef,
@@ -208,129 +180,164 @@ impl Runner {
async fn execute_once_with_retry(&mut self, ctx: &Context) {
let mut retry = self.exponential_builder.build();
let mut retry_times = 0;
let mut rollback = self.exponential_builder.build();
let mut rollback_times = 0;
loop {
// Don't store state if `ProcedureManager` is stopped.
if !self.running() {
self.meta.set_state(ProcedureState::Failed {
error: Arc::new(error::ManagerNotStartSnafu {}.build()),
});
self.meta.set_state(ProcedureState::failed(Arc::new(
error::ManagerNotStartSnafu {}.build(),
)));
return;
}
match self.execute_once(ctx).await {
ExecResult::Done | ExecResult::Failed => return,
ExecResult::Continue => (),
ExecResult::RetryLater => {
let state = self.meta.state();
match state {
ProcedureState::Running => {}
ProcedureState::Retrying { error } => {
retry_times += 1;
if let Some(d) = retry.next() {
self.wait_on_err(d, retry_times).await;
} else {
assert!(self.meta.state().is_retrying());
if let Retrying { error } = self.meta.state() {
self.meta.set_state(ProcedureState::failed(Arc::new(
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(
Error::RetryTimesExceeded {
source: error,
source: error.clone(),
procedure_id: self.meta.id,
},
)))
}
)));
}
}
ProcedureState::PrepareRollback { error }
| ProcedureState::RollingBack { error } => {
rollback_times += 1;
if let Some(d) = rollback.next() {
self.wait_on_err(d, rollback_times).await;
} else {
self.meta.set_state(ProcedureState::failed(Arc::new(
Error::RollbackTimesExceeded {
source: error.clone(),
procedure_id: self.meta.id,
},
)));
return;
}
}
ProcedureState::Done { .. } => return,
ProcedureState::Failed { .. } => return,
}
self.execute_once(ctx).await;
}
}
async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
if self.procedure.rollback_supported() {
if let Err(e) = self.procedure.rollback(ctx).await {
self.meta
.set_state(ProcedureState::rolling_back(Arc::new(e)));
return;
}
}
self.meta.set_state(ProcedureState::failed(err));
}
async fn rollback(&mut self, error: Arc<Error>) -> ExecResult {
if let Err(e) = self.rollback_procedure().await {
self.rolling_back = true;
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
async fn prepare_rollback(&mut self, err: Arc<Error>) {
if let Err(e) = self.write_procedure_state(err.to_string()).await {
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(e)));
return;
}
if self.procedure.rollback_supported() {
self.meta.set_state(ProcedureState::rolling_back(err));
} else {
self.meta.set_state(ProcedureState::failed(err));
}
self.meta.set_state(ProcedureState::failed(error));
ExecResult::Failed
}
async fn execute_once(&mut self, ctx: &Context) -> ExecResult {
// if rolling_back, there is no need to execute again.
if self.rolling_back {
// We can definitely get the previous error here.
let state = self.meta.state();
let err = state.error().unwrap();
return self.rollback(err.clone()).await;
}
match self.procedure.execute(ctx).await {
Ok(status) => {
logging::debug!(
"Execute procedure {}-{} once, status: {:?}, need_persist: {}",
self.procedure.type_name(),
self.meta.id,
status,
status.need_persist(),
);
async fn execute_once(&mut self, ctx: &Context) {
match self.meta.state() {
ProcedureState::Running | ProcedureState::Retrying { .. } => {
match self.procedure.execute(ctx).await {
Ok(status) => {
logging::debug!(
"Execute procedure {}-{} once, status: {:?}, need_persist: {}",
self.procedure.type_name(),
self.meta.id,
status,
status.need_persist(),
);
// Don't store state if `ProcedureManager` is stopped.
if !self.running() {
self.meta.set_state(ProcedureState::Failed {
error: Arc::new(error::ManagerNotStartSnafu {}.build()),
});
return ExecResult::Failed;
}
if status.need_persist() {
if let Err(err) = self.persist_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(err)));
return ExecResult::RetryLater;
}
}
match status {
Status::Executing { .. } => (),
Status::Suspended { subprocedures, .. } => {
self.on_suspended(subprocedures).await;
}
Status::Done { output } => {
if let Err(e) = self.commit_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
// Don't store state if `ProcedureManager` is stopped.
if !self.running() {
self.meta.set_state(ProcedureState::failed(Arc::new(
error::ManagerNotStartSnafu {}.build(),
)));
return;
}
self.done(output);
return ExecResult::Done;
if status.need_persist() {
if let Err(err) = self.persist_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(err)));
return;
}
}
match status {
Status::Executing { .. } => (),
Status::Suspended { subprocedures, .. } => {
self.on_suspended(subprocedures).await;
}
Status::Done { output } => {
if let Err(e) = self.commit_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
self.done(output);
}
}
}
Err(e) => {
logging::error!(
e;
"Failed to execute procedure {}-{}, retry: {}",
self.procedure.type_name(),
self.meta.id,
e.is_retry_later(),
);
// Don't store state if `ProcedureManager` is stopped.
if !self.running() {
self.meta.set_state(ProcedureState::failed(Arc::new(
error::ManagerNotStartSnafu {}.build(),
)));
return;
}
if e.is_retry_later() {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(e)));
}
}
ExecResult::Continue
}
Err(e) => {
logging::error!(
e;
"Failed to execute procedure {}-{}, retry: {}",
self.procedure.type_name(),
self.meta.id,
e.is_retry_later(),
);
// Don't store state if `ProcedureManager` is stopped.
if !self.running() {
self.meta.set_state(ProcedureState::Failed {
error: Arc::new(error::ManagerNotStartSnafu {}.build()),
});
return ExecResult::Failed;
}
if e.is_retry_later() {
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return ExecResult::RetryLater;
}
// Write rollback key so we can skip this procedure while recovering procedures.
self.rollback(Arc::new(e)).await
}
ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
ProcedureState::Failed { .. } | ProcedureState::Done { .. } => (),
}
}
/// Submit a subprocedure with specific `procedure_id`.
fn submit_subprocedure(&self, procedure_id: ProcedureId, mut procedure: BoxedProcedure) {
fn submit_subprocedure(
&self,
procedure_id: ProcedureId,
procedure_state: ProcedureState,
mut procedure: BoxedProcedure,
) {
if self.manager_ctx.contains_procedure(procedure_id) {
// If the parent has already submitted this procedure, don't submit it again.
return;
@@ -350,6 +357,7 @@ impl Runner {
let meta = Arc::new(ProcedureMeta::new(
procedure_id,
procedure_state,
Some(self.meta.id),
procedure.lock_key(),
));
@@ -407,7 +415,11 @@ impl Runner {
subprocedure.id,
);
self.submit_subprocedure(subprocedure.id, subprocedure.procedure);
self.submit_subprocedure(
subprocedure.id,
ProcedureState::Running,
subprocedure.procedure,
);
}
logging::info!(
@@ -469,9 +481,19 @@ impl Runner {
Ok(())
}
async fn rollback_procedure(&mut self) -> Result<()> {
async fn write_procedure_state(&mut self, error: String) -> Result<()> {
// Persists procedure state
let type_name = self.procedure.type_name().to_string();
let data = self.procedure.dump()?;
let message = ProcedureMessage {
type_name,
data,
parent_id: self.meta.parent_id,
step: self.step,
error: Some(error),
};
self.store
.rollback_procedure(self.meta.id, self.step)
.rollback_procedure(self.meta.id, message)
.await
.map_err(|e| {
logging::error!(
@@ -510,6 +532,7 @@ mod tests {
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::ObjectStore;
use tokio::sync::mpsc;
use super::*;
use crate::local::test_util;
@@ -566,11 +589,13 @@ mod tests {
}
}
#[derive(Debug)]
type RollbackFn = Box<dyn FnMut(Context) -> BoxFuture<'static, Result<()>> + Send>;
struct ProcedureAdapter<F> {
data: String,
lock_key: LockKey,
exec_fn: F,
rollback_fn: Option<RollbackFn>,
}
impl<F> ProcedureAdapter<F> {
@@ -597,6 +622,17 @@ mod tests {
f.await
}
async fn rollback(&mut self, ctx: &Context) -> Result<()> {
if let Some(f) = &mut self.rollback_fn {
return (f)(ctx.clone()).await;
}
Ok(())
}
fn rollback_supported(&self) -> bool {
self.rollback_fn.is_some()
}
fn dump(&self) -> Result<String> {
Ok(self.data.clone())
}
@@ -623,6 +659,7 @@ mod tests {
data: "normal".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("normal");
@@ -633,8 +670,9 @@ mod tests {
let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
runner.manager_ctx.start();
let res = runner.execute_once(&ctx).await;
assert!(res.is_continue(), "{res:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
check_files(
&object_store,
&procedure_store,
@@ -643,8 +681,9 @@ mod tests {
)
.await;
let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_done(), "{state:?}");
check_files(
&object_store,
&procedure_store,
@@ -684,6 +723,7 @@ mod tests {
data: "suspend".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("suspend");
@@ -694,8 +734,9 @@ mod tests {
let mut runner = new_runner(meta, Box::new(suspend), procedure_store);
runner.manager_ctx.start();
let res = runner.execute_once(&ctx).await;
assert!(res.is_continue(), "{res:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
}
fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId {
@@ -716,6 +757,7 @@ mod tests {
data: "child".to_string(),
lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
exec_fn,
rollback_fn: None,
};
ProcedureWithId {
@@ -784,6 +826,7 @@ mod tests {
data: "parent".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("parent");
@@ -830,6 +873,7 @@ mod tests {
data: "normal".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("test_running_is_stopped");
@@ -840,8 +884,9 @@ mod tests {
let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
runner.manager_ctx.start();
let res = runner.execute_once(&ctx).await;
assert!(res.is_continue(), "{res:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
check_files(
&object_store,
&procedure_store,
@@ -851,8 +896,9 @@ mod tests {
.await;
runner.manager_ctx.stop();
let res = runner.execute_once(&ctx).await;
assert!(res.is_failed());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");
// Shouldn't write any files
check_files(
&object_store,
@@ -871,6 +917,7 @@ mod tests {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("test_running_is_stopped_on_error");
@@ -881,8 +928,9 @@ mod tests {
let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone());
runner.manager_ctx.stop();
let res = runner.execute_once(&ctx).await;
assert!(res.is_failed(), "{res:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");
// Shouldn't write any files
check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await;
}
@@ -895,6 +943,7 @@ mod tests {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("fail");
@@ -905,9 +954,53 @@ mod tests {
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
runner.manager_ctx.start();
let res = runner.execute_once(&ctx).await;
assert!(res.is_failed(), "{res:?}");
assert!(meta.state().is_failed());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.rollback"],
)
.await;
}
#[tokio::test]
async fn test_execute_with_rollback_on_error() {
let exec_fn =
|_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed();
let rollback_fn = move |_| async move { Ok(()) }.boxed();
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
let dir = create_temp_dir("fail");
let meta = fail.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_rolling_back(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");
check_files(
&object_store,
&procedure_store,
@@ -937,6 +1030,7 @@ mod tests {
data: "retry_later".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("retry_later");
@@ -946,13 +1040,13 @@ mod tests {
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_retrying(), "{state:?}");
let res = runner.execute_once(&ctx).await;
assert!(res.is_retry_later(), "{res:?}");
assert!(meta.state().is_retrying());
let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_done(), "{state:?}");
assert!(meta.state().is_done());
check_files(
&object_store,
@@ -972,6 +1066,7 @@ mod tests {
data: "exceed_max_retry_later".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("exceed_max_retry_later");
@@ -995,6 +1090,85 @@ mod tests {
assert!(err.contains("Procedure retry exceeded max times"));
}
#[tokio::test]
async fn test_rollback_exceed_max_retry_later() {
let exec_fn =
|_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed();
let rollback_fn = move |_| {
async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
};
let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_rollback".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
let dir = create_temp_dir("exceed_max_rollback");
let meta = exceed_max_retry_later.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(
meta.clone(),
Box::new(exceed_max_retry_later),
procedure_store,
);
runner.manager_ctx.start();
runner.exponential_builder = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(1))
.with_max_times(3);
// Run the runner and execute the procedure.
runner.execute_procedure_in_loop().await;
let err = meta.state().error().unwrap().to_string();
assert!(err.contains("Procedure rollback exceeded max times"));
}
#[tokio::test]
async fn test_rollback_after_retry_fail() {
let exec_fn = move |_| {
async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed()
};
let (tx, mut rx) = mpsc::channel(1);
let rollback_fn = move |_| {
let tx = tx.clone();
async move {
tx.send(()).await.unwrap();
Ok(())
}
.boxed()
};
let retry_later = ProcedureAdapter {
data: "rollback_after_retry_fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
let dir = create_temp_dir("retry_later");
let meta = retry_later.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone());
runner.manager_ctx.start();
runner.exponential_builder = ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(1))
.with_max_times(3);
// Run the runner and execute the procedure.
runner.execute_procedure_in_loop().await;
rx.recv().await.unwrap();
assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty);
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.rollback"],
)
.await;
}
#[tokio::test]
async fn test_child_error() {
let mut times = 0;
@@ -1013,6 +1187,7 @@ mod tests {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
exec_fn,
rollback_fn: None,
};
Ok(Status::Suspended {
@@ -1047,6 +1222,7 @@ mod tests {
data: "parent".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("child_err");

View File

@@ -23,7 +23,7 @@ use smallvec::{smallvec, SmallVec};
use snafu::{ResultExt, Snafu};
use uuid::Uuid;
use crate::error::{Error, Result};
use crate::error::{self, Error, Result};
use crate::watcher::Watcher;
pub type Output = Arc<dyn Any + Send + Sync>;
@@ -125,6 +125,18 @@ pub trait Procedure: Send {
/// The implementation must be idempotent.
async fn execute(&mut self, ctx: &Context) -> Result<Status>;
/// Rollback the failed procedure.
///
/// The implementation must be idempotent.
async fn rollback(&mut self, _: &Context) -> Result<()> {
error::RollbackNotSupportedSnafu {}.fail()
}
/// Indicates whether it supports rolling back the procedure.
fn rollback_supported(&self) -> bool {
false
}
/// Dump the state of the procedure to a string.
fn dump(&self) -> Result<String>;
@@ -289,6 +301,10 @@ pub enum ProcedureState {
Done { output: Option<Output> },
/// The procedure is failed and can be retried.
Retrying { error: Arc<Error> },
/// The procedure is failed and commits state before rolling back the procedure.
PrepareRollback { error: Arc<Error> },
/// The procedure is failed and can be rollback.
RollingBack { error: Arc<Error> },
/// The procedure is failed and cannot proceed anymore.
Failed { error: Arc<Error> },
}
@@ -299,6 +315,16 @@ impl ProcedureState {
ProcedureState::Failed { error }
}
/// Returns a [ProcedureState] with prepare rollback state.
pub fn prepare_rollback(error: Arc<Error>) -> ProcedureState {
ProcedureState::PrepareRollback { error }
}
/// Returns a [ProcedureState] with rolling back state.
pub fn rolling_back(error: Arc<Error>) -> ProcedureState {
ProcedureState::RollingBack { error }
}
/// Returns a [ProcedureState] with retrying state.
pub fn retrying(error: Arc<Error>) -> ProcedureState {
ProcedureState::Retrying { error }
@@ -324,16 +350,34 @@ impl ProcedureState {
matches!(self, ProcedureState::Retrying { .. })
}
/// Returns true if the procedure state is rolling back.
pub fn is_rolling_back(&self) -> bool {
matches!(self, ProcedureState::RollingBack { .. })
}
/// Returns true if the procedure state is prepare rollback.
pub fn is_prepare_rollback(&self) -> bool {
matches!(self, ProcedureState::PrepareRollback { .. })
}
/// Returns the error.
pub fn error(&self) -> Option<&Arc<Error>> {
match self {
ProcedureState::Failed { error } => Some(error),
ProcedureState::Retrying { error } => Some(error),
ProcedureState::RollingBack { error } => Some(error),
_ => None,
}
}
}
/// The initial procedure state.
#[derive(Debug, Clone)]
pub enum InitProcedureState {
Running,
RollingBack,
}
// TODO(yingwen): Shutdown
/// `ProcedureManager` executes [Procedure] submitted to it.
#[async_trait]

View File

@@ -50,6 +50,9 @@ pub struct ProcedureMessage {
pub parent_id: Option<ProcedureId>,
/// Current step.
pub step: u32,
/// Errors raised during the procedure.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
/// Procedure storage layer.
@@ -85,6 +88,7 @@ impl ProcedureStore {
data,
parent_id,
step,
error: None,
};
let key = ParsedKey {
prefix: &self.proc_path,
@@ -122,16 +126,19 @@ impl ProcedureStore {
pub(crate) async fn rollback_procedure(
&self,
procedure_id: ProcedureId,
step: u32,
message: ProcedureMessage,
) -> Result<()> {
let key = ParsedKey {
prefix: &self.proc_path,
procedure_id,
step,
step: message.step,
key_type: KeyType::Rollback,
}
.to_string();
self.store.put(&key, Vec::new()).await?;
self.store
.put(&key, serde_json::to_vec(&message).context(ToJsonSnafu)?)
.await?;
Ok(())
}
@@ -176,11 +183,18 @@ impl ProcedureStore {
Ok(())
}
/// Load procedures from the storage. Returns a map of uncommitted procedures and a list
/// of finished procedures' ids.
/// Load procedures from the storage.
/// Returns:
/// - a map of uncommitted procedures
/// - a map of rolling back procedures
/// - a list of finished procedures' ids
pub(crate) async fn load_messages(
&self,
) -> Result<(HashMap<ProcedureId, ProcedureMessage>, Vec<ProcedureId>)> {
) -> Result<(
HashMap<ProcedureId, ProcedureMessage>,
HashMap<ProcedureId, ProcedureMessage>,
Vec<ProcedureId>,
)> {
// Track the key-value pair by procedure id.
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
@@ -204,21 +218,33 @@ impl ProcedureStore {
}
let mut messages = HashMap::with_capacity(procedure_key_values.len());
let mut rollback_messages = HashMap::new();
let mut finished_ids = Vec::new();
for (procedure_id, (parsed_key, value)) in procedure_key_values {
if parsed_key.key_type == KeyType::Step {
let Some(message) = self.load_one_message(&parsed_key, &value) else {
// We don't abort the loading process and just ignore errors to ensure all remaining
// procedures are loaded.
continue;
};
let _ = messages.insert(procedure_id, message);
} else {
finished_ids.push(procedure_id);
match parsed_key.key_type {
KeyType::Step => {
let Some(message) = self.load_one_message(&parsed_key, &value) else {
// We don't abort the loading process and just ignore errors to ensure all remaining
// procedures are loaded.
continue;
};
let _ = messages.insert(procedure_id, message);
}
KeyType::Commit => {
finished_ids.push(procedure_id);
}
KeyType::Rollback => {
let Some(message) = self.load_one_message(&parsed_key, &value) else {
// We don't abort the loading process and just ignore errors to ensure all remaining
// procedures are loaded.
continue;
};
let _ = rollback_messages.insert(procedure_id, message);
}
}
}
Ok((messages, finished_ids))
Ok((messages, rollback_messages, finished_ids))
}
fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option<ProcedureMessage> {
@@ -430,6 +456,7 @@ mod tests {
data: "no parent id".to_string(),
parent_id: None,
step: 4,
error: None,
};
let json = serde_json::to_string(&message).unwrap();
@@ -490,8 +517,9 @@ mod tests {
.await
.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
assert_eq!(1, messages.len());
assert!(rollback_messages.is_empty());
assert!(finished.is_empty());
let msg = messages.get(&procedure_id).unwrap();
let expect = ProcedureMessage {
@@ -499,6 +527,7 @@ mod tests {
data: "test store procedure".to_string(),
parent_id: None,
step: 0,
error: None,
};
assert_eq!(expect, *msg);
}
@@ -518,8 +547,9 @@ mod tests {
.unwrap();
store.commit_procedure(procedure_id, 1).await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert!(rollback_messages.is_empty());
assert_eq!(&[procedure_id], &finished[..]);
}
@@ -533,14 +563,32 @@ mod tests {
let type_name = procedure.type_name().to_string();
let data = procedure.dump().unwrap();
store
.store_procedure(procedure_id, 0, type_name, data, None)
.store_procedure(
procedure_id,
0,
type_name.to_string(),
data.to_string(),
None,
)
.await
.unwrap();
let message = ProcedureMessage {
type_name,
data,
parent_id: None,
step: 1,
error: None,
};
store
.rollback_procedure(procedure_id, message)
.await
.unwrap();
store.rollback_procedure(procedure_id, 1).await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert_eq!(&[procedure_id], &finished[..]);
assert_eq!(1, rollback_messages.len());
assert!(finished.is_empty());
assert!(rollback_messages.contains_key(&procedure_id));
}
#[tokio::test]
@@ -565,8 +613,9 @@ mod tests {
store.delete_procedure(procedure_id).await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert!(rollback_messages.is_empty());
assert!(finished.is_empty());
}
@@ -595,8 +644,9 @@ mod tests {
store.delete_procedure(procedure_id).await.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
assert!(messages.is_empty());
assert!(rollback_messages.is_empty());
assert!(finished.is_empty());
}
@@ -657,8 +707,9 @@ mod tests {
.await
.unwrap();
let (messages, finished) = store.load_messages().await.unwrap();
let (messages, rollback_messages, finished) = store.load_messages().await.unwrap();
assert_eq!(2, messages.len());
assert!(rollback_messages.is_empty());
assert_eq!(1, finished.len());
let msg = messages.get(&id0).unwrap();

View File

@@ -37,6 +37,12 @@ pub async fn wait(watcher: &mut Watcher) -> Result<Option<Output>> {
ProcedureState::Retrying { error } => {
debug!("retrying, source: {}", error)
}
ProcedureState::RollingBack { error } => {
debug!("rolling back, source: {:?}", error)
}
ProcedureState::PrepareRollback { error } => {
debug!("commit rollback, source: {}", error)
}
}
}
}