diff --git a/Cargo.lock b/Cargo.lock index 36bca2a89a..b1c18ed53f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3781,7 +3781,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=1bd2398b686e5ac6c1eef6daf615867ce27f75c1#1bd2398b686e5ac6c1eef6daf615867ce27f75c1" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c00c73b76ed20a603e4d29fb96cb4e6ef987bba7#c00c73b76ed20a603e4d29fb96cb4e6ef987bba7" dependencies = [ "prost 0.12.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 70899255cd..b6efa57319 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,7 +104,7 @@ etcd-client = "0.12" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "1bd2398b686e5ac6c1eef6daf615867ce27f75c1" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c00c73b76ed20a603e4d29fb96cb4e6ef987bba7" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index b4de8747df..6dabe899a8 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -56,6 +56,12 @@ pub fn procedure_state_to_pb_response(state: &ProcedureState) -> PbProcedureStat ProcedureState::Done { .. } => (PbProcedureStatus::Done, String::default()), ProcedureState::Retrying { error } => (PbProcedureStatus::Retrying, error.to_string()), ProcedureState::Failed { error } => (PbProcedureStatus::Failed, error.to_string()), + ProcedureState::PrepareRollback { error } => { + (PbProcedureStatus::PrepareRollback, error.to_string()) + } + ProcedureState::RollingBack { error } => { + (PbProcedureStatus::RollingBack, error.to_string()) + } }; PbProcedureStateResponse { diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 98ecbfef25..6c881db9b1 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -104,12 +104,24 @@ pub enum Error { location: Location, }, + #[snafu(display("Rollback Procedure recovered: {error}"))] + RollbackProcedureRecovered { error: String, location: Location }, + #[snafu(display("Procedure retry exceeded max times, procedure_id: {}", procedure_id))] RetryTimesExceeded { source: Arc, procedure_id: ProcedureId, }, + #[snafu(display( + "Procedure rollback exceeded max times, procedure_id: {}", + procedure_id + ))] + RollbackTimesExceeded { + source: Arc, + procedure_id: ProcedureId, + }, + #[snafu(display("Corrupted data, error: "))] CorruptedData { #[snafu(source)] @@ -145,6 +157,9 @@ pub enum Error { #[snafu(display("Unexpected: {err_msg}"))] Unexpected { location: Location, err_msg: String }, + + #[snafu(display("Not support to rollback the procedure"))] + RollbackNotSupported { location: Location }, } pub type Result = std::result::Result; @@ -161,9 +176,12 @@ impl ErrorExt for Error { | Error::DeleteState { .. } | Error::FromJson { .. } | Error::RetryTimesExceeded { .. } + | Error::RollbackTimesExceeded { .. } | Error::RetryLater { .. } | Error::WaitWatcher { .. } - | Error::ManagerNotStart { .. } => StatusCode::Internal, + | Error::ManagerNotStart { .. } + | Error::RollbackProcedureRecovered { .. } + | Error::RollbackNotSupported { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 9602d0a87b..4e13a82b24 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -31,11 +31,11 @@ use tokio::sync::{Mutex as TokioMutex, Notify}; use self::rwlock::KeyRwLock; use crate::error::{ - DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result, + self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, }; use crate::local::runner::Runner; -use crate::procedure::BoxedProcedureLoader; +use crate::procedure::{BoxedProcedureLoader, InitProcedureState}; use crate::store::{ProcedureMessage, ProcedureStore, StateStoreRef}; use crate::{ BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState, @@ -72,8 +72,13 @@ pub(crate) struct ProcedureMeta { } impl ProcedureMeta { - fn new(id: ProcedureId, parent_id: Option, lock_key: LockKey) -> ProcedureMeta { - let (state_sender, state_receiver) = watch::channel(ProcedureState::Running); + fn new( + id: ProcedureId, + procedure_state: ProcedureState, + parent_id: Option, + lock_key: LockKey, + ) -> ProcedureMeta { + let (state_sender, state_receiver) = watch::channel(procedure_state); ProcedureMeta { id, parent_id, @@ -424,12 +429,18 @@ impl LocalManager { fn submit_root( &self, procedure_id: ProcedureId, + procedure_state: ProcedureState, step: u32, procedure: BoxedProcedure, ) -> Result { ensure!(self.manager_ctx.running(), ManagerNotStartSnafu); - let meta = Arc::new(ProcedureMeta::new(procedure_id, None, procedure.lock_key())); + let meta = Arc::new(ProcedureMeta::new( + procedure_id, + procedure_state, + None, + procedure.lock_key(), + )); let runner = Runner { meta: meta.clone(), procedure, @@ -468,13 +479,11 @@ impl LocalManager { Ok(watcher) } - /// Recovers unfinished procedures and reruns them. - async fn recover(&self) -> Result<()> { - logging::info!("LocalManager start to recover"); - let recover_start = Instant::now(); - - let (messages, finished_ids) = self.procedure_store.load_messages().await?; - + fn submit_recovered_messages( + &self, + messages: HashMap, + init_state: InitProcedureState, + ) { for (procedure_id, message) in &messages { if message.parent_id.is_none() { // This is the root procedure. We only submit the root procedure as it will @@ -494,8 +503,21 @@ impl LocalManager { loaded_procedure.step ); + let procedure_state = match init_state { + InitProcedureState::RollingBack => ProcedureState::RollingBack { + error: Arc::new( + error::RollbackProcedureRecoveredSnafu { + error: message.error.clone().unwrap_or("Unknown error".to_string()), + } + .build(), + ), + }, + InitProcedureState::Running => ProcedureState::Running, + }; + if let Err(e) = self.submit_root( *procedure_id, + procedure_state, loaded_procedure.step, loaded_procedure.procedure, ) { @@ -503,6 +525,18 @@ impl LocalManager { } } } + } + + /// Recovers unfinished procedures and reruns them. + async fn recover(&self) -> Result<()> { + logging::info!("LocalManager start to recover"); + let recover_start = Instant::now(); + + let (messages, rollback_messages, finished_ids) = + self.procedure_store.load_messages().await?; + // Submits recovered messages first. + self.submit_recovered_messages(rollback_messages, InitProcedureState::RollingBack); + self.submit_recovered_messages(messages, InitProcedureState::Running); if !finished_ids.is_empty() { logging::info!( @@ -587,7 +621,12 @@ impl ProcedureManager for LocalManager { DuplicateProcedureSnafu { procedure_id } ); - self.submit_root(procedure.id, 0, procedure.procedure) + self.submit_root( + procedure.id, + ProcedureState::Running, + 0, + procedure.procedure, + ) } async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { @@ -626,7 +665,12 @@ pub(crate) mod test_util { use super::*; pub(crate) fn procedure_meta_for_test() -> ProcedureMeta { - ProcedureMeta::new(ProcedureId::random(), None, LockKey::default()) + ProcedureMeta::new( + ProcedureId::random(), + ProcedureState::Running, + None, + LockKey::default(), + ) } pub(crate) fn new_object_store(dir: &TempDir) -> ObjectStore { @@ -914,6 +958,14 @@ mod tests { } } + async fn rollback(&mut self, _: &Context) -> Result<()> { + Ok(()) + } + + fn rollback_supported(&self) -> bool { + true + } + fn dump(&self) -> Result { Ok(String::new()) } @@ -923,24 +975,29 @@ mod tests { } } - let check_procedure = |procedure| { - async { - let procedure_id = ProcedureId::random(); - let mut watcher = manager - .submit(ProcedureWithId { - id: procedure_id, - procedure: Box::new(procedure), - }) - .await - .unwrap(); - // Wait for the notification. - watcher.changed().await.unwrap(); - assert!(watcher.borrow().is_failed()); - } + let check_procedure = |procedure| async { + let procedure_id = ProcedureId::random(); + manager + .submit(ProcedureWithId { + id: procedure_id, + procedure: Box::new(procedure), + }) + .await + .unwrap() }; - check_procedure(MockProcedure { panic: false }).await; - check_procedure(MockProcedure { panic: true }).await; + let mut watcher = check_procedure(MockProcedure { panic: false }).await; + // Wait for the notification. + watcher.changed().await.unwrap(); + assert!(watcher.borrow().is_prepare_rollback()); + watcher.changed().await.unwrap(); + assert!(watcher.borrow().is_rolling_back()); + watcher.changed().await.unwrap(); + assert!(watcher.borrow().is_failed()); + // The runner won't rollback a panicked procedure. + let mut watcher = check_procedure(MockProcedure { panic: true }).await; + watcher.changed().await.unwrap(); + assert!(watcher.borrow().is_failed()); } #[tokio::test] diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index bd866ea1d4..ec40134677 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -23,37 +23,9 @@ use super::rwlock::OwnedKeyRwLockGuard; use crate::error::{self, ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::procedure::{Output, StringKey}; -use crate::store::ProcedureStore; -use crate::ProcedureState::Retrying; +use crate::store::{ProcedureMessage, ProcedureStore}; use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status}; -#[derive(Debug)] -enum ExecResult { - Continue, - Done, - RetryLater, - Failed, -} - -#[cfg(test)] -impl ExecResult { - fn is_continue(&self) -> bool { - matches!(self, ExecResult::Continue) - } - - fn is_done(&self) -> bool { - matches!(self, ExecResult::Done) - } - - fn is_retry_later(&self) -> bool { - matches!(self, ExecResult::RetryLater) - } - - fn is_failed(&self) -> bool { - matches!(self, ExecResult::Failed) - } -} - /// A guard to cleanup procedure state. struct ProcedureGuard { meta: ProcedureMetaRef, @@ -208,129 +180,164 @@ impl Runner { async fn execute_once_with_retry(&mut self, ctx: &Context) { let mut retry = self.exponential_builder.build(); let mut retry_times = 0; + + let mut rollback = self.exponential_builder.build(); + let mut rollback_times = 0; + loop { // Don't store state if `ProcedureManager` is stopped. if !self.running() { - self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ManagerNotStartSnafu {}.build()), - }); + self.meta.set_state(ProcedureState::failed(Arc::new( + error::ManagerNotStartSnafu {}.build(), + ))); return; } - match self.execute_once(ctx).await { - ExecResult::Done | ExecResult::Failed => return, - ExecResult::Continue => (), - ExecResult::RetryLater => { + let state = self.meta.state(); + match state { + ProcedureState::Running => {} + ProcedureState::Retrying { error } => { retry_times += 1; if let Some(d) = retry.next() { self.wait_on_err(d, retry_times).await; } else { - assert!(self.meta.state().is_retrying()); - if let Retrying { error } = self.meta.state() { - self.meta.set_state(ProcedureState::failed(Arc::new( + self.meta + .set_state(ProcedureState::prepare_rollback(Arc::new( Error::RetryTimesExceeded { - source: error, + source: error.clone(), procedure_id: self.meta.id, }, - ))) - } + ))); + } + } + ProcedureState::PrepareRollback { error } + | ProcedureState::RollingBack { error } => { + rollback_times += 1; + if let Some(d) = rollback.next() { + self.wait_on_err(d, rollback_times).await; + } else { + self.meta.set_state(ProcedureState::failed(Arc::new( + Error::RollbackTimesExceeded { + source: error.clone(), + procedure_id: self.meta.id, + }, + ))); return; } } + ProcedureState::Done { .. } => return, + ProcedureState::Failed { .. } => return, + } + self.execute_once(ctx).await; + } + } + + async fn rollback(&mut self, ctx: &Context, err: Arc) { + if self.procedure.rollback_supported() { + if let Err(e) = self.procedure.rollback(ctx).await { + self.meta + .set_state(ProcedureState::rolling_back(Arc::new(e))); + return; } } + self.meta.set_state(ProcedureState::failed(err)); } - async fn rollback(&mut self, error: Arc) -> ExecResult { - if let Err(e) = self.rollback_procedure().await { - self.rolling_back = true; - self.meta.set_state(ProcedureState::retrying(Arc::new(e))); - return ExecResult::RetryLater; + async fn prepare_rollback(&mut self, err: Arc) { + if let Err(e) = self.write_procedure_state(err.to_string()).await { + self.meta + .set_state(ProcedureState::prepare_rollback(Arc::new(e))); + return; + } + if self.procedure.rollback_supported() { + self.meta.set_state(ProcedureState::rolling_back(err)); + } else { + self.meta.set_state(ProcedureState::failed(err)); } - self.meta.set_state(ProcedureState::failed(error)); - ExecResult::Failed } - async fn execute_once(&mut self, ctx: &Context) -> ExecResult { - // if rolling_back, there is no need to execute again. - if self.rolling_back { - // We can definitely get the previous error here. - let state = self.meta.state(); - let err = state.error().unwrap(); - return self.rollback(err.clone()).await; - } - match self.procedure.execute(ctx).await { - Ok(status) => { - logging::debug!( - "Execute procedure {}-{} once, status: {:?}, need_persist: {}", - self.procedure.type_name(), - self.meta.id, - status, - status.need_persist(), - ); + async fn execute_once(&mut self, ctx: &Context) { + match self.meta.state() { + ProcedureState::Running | ProcedureState::Retrying { .. } => { + match self.procedure.execute(ctx).await { + Ok(status) => { + logging::debug!( + "Execute procedure {}-{} once, status: {:?}, need_persist: {}", + self.procedure.type_name(), + self.meta.id, + status, + status.need_persist(), + ); - // Don't store state if `ProcedureManager` is stopped. - if !self.running() { - self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ManagerNotStartSnafu {}.build()), - }); - return ExecResult::Failed; - } - - if status.need_persist() { - if let Err(err) = self.persist_procedure().await { - self.meta.set_state(ProcedureState::retrying(Arc::new(err))); - return ExecResult::RetryLater; - } - } - - match status { - Status::Executing { .. } => (), - Status::Suspended { subprocedures, .. } => { - self.on_suspended(subprocedures).await; - } - Status::Done { output } => { - if let Err(e) = self.commit_procedure().await { - self.meta.set_state(ProcedureState::retrying(Arc::new(e))); - return ExecResult::RetryLater; + // Don't store state if `ProcedureManager` is stopped. + if !self.running() { + self.meta.set_state(ProcedureState::failed(Arc::new( + error::ManagerNotStartSnafu {}.build(), + ))); + return; } - self.done(output); - return ExecResult::Done; + if status.need_persist() { + if let Err(err) = self.persist_procedure().await { + self.meta.set_state(ProcedureState::retrying(Arc::new(err))); + return; + } + } + + match status { + Status::Executing { .. } => (), + Status::Suspended { subprocedures, .. } => { + self.on_suspended(subprocedures).await; + } + Status::Done { output } => { + if let Err(e) = self.commit_procedure().await { + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); + return; + } + + self.done(output); + } + } + } + Err(e) => { + logging::error!( + e; + "Failed to execute procedure {}-{}, retry: {}", + self.procedure.type_name(), + self.meta.id, + e.is_retry_later(), + ); + + // Don't store state if `ProcedureManager` is stopped. + if !self.running() { + self.meta.set_state(ProcedureState::failed(Arc::new( + error::ManagerNotStartSnafu {}.build(), + ))); + return; + } + + if e.is_retry_later() { + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); + return; + } + + self.meta + .set_state(ProcedureState::prepare_rollback(Arc::new(e))); } } - - ExecResult::Continue - } - Err(e) => { - logging::error!( - e; - "Failed to execute procedure {}-{}, retry: {}", - self.procedure.type_name(), - self.meta.id, - e.is_retry_later(), - ); - - // Don't store state if `ProcedureManager` is stopped. - if !self.running() { - self.meta.set_state(ProcedureState::Failed { - error: Arc::new(error::ManagerNotStartSnafu {}.build()), - }); - return ExecResult::Failed; - } - - if e.is_retry_later() { - self.meta.set_state(ProcedureState::retrying(Arc::new(e))); - return ExecResult::RetryLater; - } - - // Write rollback key so we can skip this procedure while recovering procedures. - self.rollback(Arc::new(e)).await } + ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await, + ProcedureState::RollingBack { error } => self.rollback(ctx, error).await, + ProcedureState::Failed { .. } | ProcedureState::Done { .. } => (), } } /// Submit a subprocedure with specific `procedure_id`. - fn submit_subprocedure(&self, procedure_id: ProcedureId, mut procedure: BoxedProcedure) { + fn submit_subprocedure( + &self, + procedure_id: ProcedureId, + procedure_state: ProcedureState, + mut procedure: BoxedProcedure, + ) { if self.manager_ctx.contains_procedure(procedure_id) { // If the parent has already submitted this procedure, don't submit it again. return; @@ -350,6 +357,7 @@ impl Runner { let meta = Arc::new(ProcedureMeta::new( procedure_id, + procedure_state, Some(self.meta.id), procedure.lock_key(), )); @@ -407,7 +415,11 @@ impl Runner { subprocedure.id, ); - self.submit_subprocedure(subprocedure.id, subprocedure.procedure); + self.submit_subprocedure( + subprocedure.id, + ProcedureState::Running, + subprocedure.procedure, + ); } logging::info!( @@ -469,9 +481,19 @@ impl Runner { Ok(()) } - async fn rollback_procedure(&mut self) -> Result<()> { + async fn write_procedure_state(&mut self, error: String) -> Result<()> { + // Persists procedure state + let type_name = self.procedure.type_name().to_string(); + let data = self.procedure.dump()?; + let message = ProcedureMessage { + type_name, + data, + parent_id: self.meta.parent_id, + step: self.step, + error: Some(error), + }; self.store - .rollback_procedure(self.meta.id, self.step) + .rollback_procedure(self.meta.id, message) .await .map_err(|e| { logging::error!( @@ -510,6 +532,7 @@ mod tests { use futures_util::future::BoxFuture; use futures_util::FutureExt; use object_store::ObjectStore; + use tokio::sync::mpsc; use super::*; use crate::local::test_util; @@ -566,11 +589,13 @@ mod tests { } } - #[derive(Debug)] + type RollbackFn = Box BoxFuture<'static, Result<()>> + Send>; + struct ProcedureAdapter { data: String, lock_key: LockKey, exec_fn: F, + rollback_fn: Option, } impl ProcedureAdapter { @@ -597,6 +622,17 @@ mod tests { f.await } + async fn rollback(&mut self, ctx: &Context) -> Result<()> { + if let Some(f) = &mut self.rollback_fn { + return (f)(ctx.clone()).await; + } + Ok(()) + } + + fn rollback_supported(&self) -> bool { + self.rollback_fn.is_some() + } + fn dump(&self) -> Result { Ok(self.data.clone()) } @@ -623,6 +659,7 @@ mod tests { data: "normal".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("normal"); @@ -633,8 +670,9 @@ mod tests { let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); runner.manager_ctx.start(); - let res = runner.execute_once(&ctx).await; - assert!(res.is_continue(), "{res:?}"); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); check_files( &object_store, &procedure_store, @@ -643,8 +681,9 @@ mod tests { ) .await; - let res = runner.execute_once(&ctx).await; - assert!(res.is_done(), "{res:?}"); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_done(), "{state:?}"); check_files( &object_store, &procedure_store, @@ -684,6 +723,7 @@ mod tests { data: "suspend".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("suspend"); @@ -694,8 +734,9 @@ mod tests { let mut runner = new_runner(meta, Box::new(suspend), procedure_store); runner.manager_ctx.start(); - let res = runner.execute_once(&ctx).await; - assert!(res.is_continue(), "{res:?}"); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); } fn new_child_procedure(procedure_id: ProcedureId, keys: &[&str]) -> ProcedureWithId { @@ -716,6 +757,7 @@ mod tests { data: "child".to_string(), lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())), exec_fn, + rollback_fn: None, }; ProcedureWithId { @@ -784,6 +826,7 @@ mod tests { data: "parent".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("parent"); @@ -830,6 +873,7 @@ mod tests { data: "normal".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("test_running_is_stopped"); @@ -840,8 +884,9 @@ mod tests { let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); runner.manager_ctx.start(); - let res = runner.execute_once(&ctx).await; - assert!(res.is_continue(), "{res:?}"); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); check_files( &object_store, &procedure_store, @@ -851,8 +896,9 @@ mod tests { .await; runner.manager_ctx.stop(); - let res = runner.execute_once(&ctx).await; - assert!(res.is_failed()); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_failed(), "{state:?}"); // Shouldn't write any files check_files( &object_store, @@ -871,6 +917,7 @@ mod tests { data: "fail".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("test_running_is_stopped_on_error"); @@ -881,8 +928,9 @@ mod tests { let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); runner.manager_ctx.stop(); - let res = runner.execute_once(&ctx).await; - assert!(res.is_failed(), "{res:?}"); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_failed(), "{state:?}"); // Shouldn't write any files check_files(&object_store, &procedure_store, ctx.procedure_id, &[]).await; } @@ -895,6 +943,7 @@ mod tests { data: "fail".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("fail"); @@ -905,9 +954,53 @@ mod tests { let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone()); runner.manager_ctx.start(); - let res = runner.execute_once(&ctx).await; - assert!(res.is_failed(), "{res:?}"); - assert!(meta.state().is_failed()); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_prepare_rollback(), "{state:?}"); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_failed(), "{state:?}"); + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.rollback"], + ) + .await; + } + + #[tokio::test] + async fn test_execute_with_rollback_on_error() { + let exec_fn = + |_| async { Err(Error::external(MockError::new(StatusCode::Unexpected))) }.boxed(); + let rollback_fn = move |_| async move { Ok(()) }.boxed(); + let fail = ProcedureAdapter { + data: "fail".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + exec_fn, + rollback_fn: Some(Box::new(rollback_fn)), + }; + + let dir = create_temp_dir("fail"); + let meta = fail.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone()); + runner.manager_ctx.start(); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_prepare_rollback(), "{state:?}"); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_rolling_back(), "{state:?}"); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_failed(), "{state:?}"); check_files( &object_store, &procedure_store, @@ -937,6 +1030,7 @@ mod tests { data: "retry_later".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("retry_later"); @@ -946,13 +1040,13 @@ mod tests { let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone()); runner.manager_ctx.start(); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_retrying(), "{state:?}"); - let res = runner.execute_once(&ctx).await; - assert!(res.is_retry_later(), "{res:?}"); - assert!(meta.state().is_retrying()); - - let res = runner.execute_once(&ctx).await; - assert!(res.is_done(), "{res:?}"); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_done(), "{state:?}"); assert!(meta.state().is_done()); check_files( &object_store, @@ -972,6 +1066,7 @@ mod tests { data: "exceed_max_retry_later".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("exceed_max_retry_later"); @@ -995,6 +1090,85 @@ mod tests { assert!(err.contains("Procedure retry exceeded max times")); } + #[tokio::test] + async fn test_rollback_exceed_max_retry_later() { + let exec_fn = + |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed(); + let rollback_fn = move |_| { + async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed() + }; + let exceed_max_retry_later = ProcedureAdapter { + data: "exceed_max_rollback".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + exec_fn, + rollback_fn: Some(Box::new(rollback_fn)), + }; + + let dir = create_temp_dir("exceed_max_rollback"); + let meta = exceed_max_retry_later.new_meta(ROOT_ID); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner( + meta.clone(), + Box::new(exceed_max_retry_later), + procedure_store, + ); + runner.manager_ctx.start(); + runner.exponential_builder = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(1)) + .with_max_times(3); + + // Run the runner and execute the procedure. + runner.execute_procedure_in_loop().await; + let err = meta.state().error().unwrap().to_string(); + assert!(err.contains("Procedure rollback exceeded max times")); + } + + #[tokio::test] + async fn test_rollback_after_retry_fail() { + let exec_fn = move |_| { + async move { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed() + }; + + let (tx, mut rx) = mpsc::channel(1); + let rollback_fn = move |_| { + let tx = tx.clone(); + async move { + tx.send(()).await.unwrap(); + Ok(()) + } + .boxed() + }; + let retry_later = ProcedureAdapter { + data: "rollback_after_retry_fail".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + exec_fn, + rollback_fn: Some(Box::new(rollback_fn)), + }; + + let dir = create_temp_dir("retry_later"); + let meta = retry_later.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone()); + runner.manager_ctx.start(); + runner.exponential_builder = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(1)) + .with_max_times(3); + // Run the runner and execute the procedure. + runner.execute_procedure_in_loop().await; + rx.recv().await.unwrap(); + assert_eq!(rx.try_recv().unwrap_err(), mpsc::error::TryRecvError::Empty); + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.rollback"], + ) + .await; + } + #[tokio::test] async fn test_child_error() { let mut times = 0; @@ -1013,6 +1187,7 @@ mod tests { data: "fail".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"), exec_fn, + rollback_fn: None, }; Ok(Status::Suspended { @@ -1047,6 +1222,7 @@ mod tests { data: "parent".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), exec_fn, + rollback_fn: None, }; let dir = create_temp_dir("child_err"); diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 434d54950e..709a8ac4e4 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -23,7 +23,7 @@ use smallvec::{smallvec, SmallVec}; use snafu::{ResultExt, Snafu}; use uuid::Uuid; -use crate::error::{Error, Result}; +use crate::error::{self, Error, Result}; use crate::watcher::Watcher; pub type Output = Arc; @@ -125,6 +125,18 @@ pub trait Procedure: Send { /// The implementation must be idempotent. async fn execute(&mut self, ctx: &Context) -> Result; + /// Rollback the failed procedure. + /// + /// The implementation must be idempotent. + async fn rollback(&mut self, _: &Context) -> Result<()> { + error::RollbackNotSupportedSnafu {}.fail() + } + + /// Indicates whether it supports rolling back the procedure. + fn rollback_supported(&self) -> bool { + false + } + /// Dump the state of the procedure to a string. fn dump(&self) -> Result; @@ -289,6 +301,10 @@ pub enum ProcedureState { Done { output: Option }, /// The procedure is failed and can be retried. Retrying { error: Arc }, + /// The procedure is failed and commits state before rolling back the procedure. + PrepareRollback { error: Arc }, + /// The procedure is failed and can be rollback. + RollingBack { error: Arc }, /// The procedure is failed and cannot proceed anymore. Failed { error: Arc }, } @@ -299,6 +315,16 @@ impl ProcedureState { ProcedureState::Failed { error } } + /// Returns a [ProcedureState] with prepare rollback state. + pub fn prepare_rollback(error: Arc) -> ProcedureState { + ProcedureState::PrepareRollback { error } + } + + /// Returns a [ProcedureState] with rolling back state. + pub fn rolling_back(error: Arc) -> ProcedureState { + ProcedureState::RollingBack { error } + } + /// Returns a [ProcedureState] with retrying state. pub fn retrying(error: Arc) -> ProcedureState { ProcedureState::Retrying { error } @@ -324,16 +350,34 @@ impl ProcedureState { matches!(self, ProcedureState::Retrying { .. }) } + /// Returns true if the procedure state is rolling back. + pub fn is_rolling_back(&self) -> bool { + matches!(self, ProcedureState::RollingBack { .. }) + } + + /// Returns true if the procedure state is prepare rollback. + pub fn is_prepare_rollback(&self) -> bool { + matches!(self, ProcedureState::PrepareRollback { .. }) + } + /// Returns the error. pub fn error(&self) -> Option<&Arc> { match self { ProcedureState::Failed { error } => Some(error), ProcedureState::Retrying { error } => Some(error), + ProcedureState::RollingBack { error } => Some(error), _ => None, } } } +/// The initial procedure state. +#[derive(Debug, Clone)] +pub enum InitProcedureState { + Running, + RollingBack, +} + // TODO(yingwen): Shutdown /// `ProcedureManager` executes [Procedure] submitted to it. #[async_trait] diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index e0a129c52a..cc96ed9f80 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -50,6 +50,9 @@ pub struct ProcedureMessage { pub parent_id: Option, /// Current step. pub step: u32, + /// Errors raised during the procedure. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub error: Option, } /// Procedure storage layer. @@ -85,6 +88,7 @@ impl ProcedureStore { data, parent_id, step, + error: None, }; let key = ParsedKey { prefix: &self.proc_path, @@ -122,16 +126,19 @@ impl ProcedureStore { pub(crate) async fn rollback_procedure( &self, procedure_id: ProcedureId, - step: u32, + message: ProcedureMessage, ) -> Result<()> { let key = ParsedKey { prefix: &self.proc_path, procedure_id, - step, + step: message.step, key_type: KeyType::Rollback, } .to_string(); - self.store.put(&key, Vec::new()).await?; + + self.store + .put(&key, serde_json::to_vec(&message).context(ToJsonSnafu)?) + .await?; Ok(()) } @@ -176,11 +183,18 @@ impl ProcedureStore { Ok(()) } - /// Load procedures from the storage. Returns a map of uncommitted procedures and a list - /// of finished procedures' ids. + /// Load procedures from the storage. + /// Returns: + /// - a map of uncommitted procedures + /// - a map of rolling back procedures + /// - a list of finished procedures' ids pub(crate) async fn load_messages( &self, - ) -> Result<(HashMap, Vec)> { + ) -> Result<( + HashMap, + HashMap, + Vec, + )> { // Track the key-value pair by procedure id. let mut procedure_key_values: HashMap<_, (ParsedKey, Vec)> = HashMap::new(); @@ -204,21 +218,33 @@ impl ProcedureStore { } let mut messages = HashMap::with_capacity(procedure_key_values.len()); + let mut rollback_messages = HashMap::new(); let mut finished_ids = Vec::new(); for (procedure_id, (parsed_key, value)) in procedure_key_values { - if parsed_key.key_type == KeyType::Step { - let Some(message) = self.load_one_message(&parsed_key, &value) else { - // We don't abort the loading process and just ignore errors to ensure all remaining - // procedures are loaded. - continue; - }; - let _ = messages.insert(procedure_id, message); - } else { - finished_ids.push(procedure_id); + match parsed_key.key_type { + KeyType::Step => { + let Some(message) = self.load_one_message(&parsed_key, &value) else { + // We don't abort the loading process and just ignore errors to ensure all remaining + // procedures are loaded. + continue; + }; + let _ = messages.insert(procedure_id, message); + } + KeyType::Commit => { + finished_ids.push(procedure_id); + } + KeyType::Rollback => { + let Some(message) = self.load_one_message(&parsed_key, &value) else { + // We don't abort the loading process and just ignore errors to ensure all remaining + // procedures are loaded. + continue; + }; + let _ = rollback_messages.insert(procedure_id, message); + } } } - Ok((messages, finished_ids)) + Ok((messages, rollback_messages, finished_ids)) } fn load_one_message(&self, key: &ParsedKey, value: &[u8]) -> Option { @@ -430,6 +456,7 @@ mod tests { data: "no parent id".to_string(), parent_id: None, step: 4, + error: None, }; let json = serde_json::to_string(&message).unwrap(); @@ -490,8 +517,9 @@ mod tests { .await .unwrap(); - let (messages, finished) = store.load_messages().await.unwrap(); + let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); assert_eq!(1, messages.len()); + assert!(rollback_messages.is_empty()); assert!(finished.is_empty()); let msg = messages.get(&procedure_id).unwrap(); let expect = ProcedureMessage { @@ -499,6 +527,7 @@ mod tests { data: "test store procedure".to_string(), parent_id: None, step: 0, + error: None, }; assert_eq!(expect, *msg); } @@ -518,8 +547,9 @@ mod tests { .unwrap(); store.commit_procedure(procedure_id, 1).await.unwrap(); - let (messages, finished) = store.load_messages().await.unwrap(); + let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); assert!(messages.is_empty()); + assert!(rollback_messages.is_empty()); assert_eq!(&[procedure_id], &finished[..]); } @@ -533,14 +563,32 @@ mod tests { let type_name = procedure.type_name().to_string(); let data = procedure.dump().unwrap(); store - .store_procedure(procedure_id, 0, type_name, data, None) + .store_procedure( + procedure_id, + 0, + type_name.to_string(), + data.to_string(), + None, + ) + .await + .unwrap(); + let message = ProcedureMessage { + type_name, + data, + parent_id: None, + step: 1, + error: None, + }; + store + .rollback_procedure(procedure_id, message) .await .unwrap(); - store.rollback_procedure(procedure_id, 1).await.unwrap(); - let (messages, finished) = store.load_messages().await.unwrap(); + let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); assert!(messages.is_empty()); - assert_eq!(&[procedure_id], &finished[..]); + assert_eq!(1, rollback_messages.len()); + assert!(finished.is_empty()); + assert!(rollback_messages.contains_key(&procedure_id)); } #[tokio::test] @@ -565,8 +613,9 @@ mod tests { store.delete_procedure(procedure_id).await.unwrap(); - let (messages, finished) = store.load_messages().await.unwrap(); + let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); assert!(messages.is_empty()); + assert!(rollback_messages.is_empty()); assert!(finished.is_empty()); } @@ -595,8 +644,9 @@ mod tests { store.delete_procedure(procedure_id).await.unwrap(); - let (messages, finished) = store.load_messages().await.unwrap(); + let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); assert!(messages.is_empty()); + assert!(rollback_messages.is_empty()); assert!(finished.is_empty()); } @@ -657,8 +707,9 @@ mod tests { .await .unwrap(); - let (messages, finished) = store.load_messages().await.unwrap(); + let (messages, rollback_messages, finished) = store.load_messages().await.unwrap(); assert_eq!(2, messages.len()); + assert!(rollback_messages.is_empty()); assert_eq!(1, finished.len()); let msg = messages.get(&id0).unwrap(); diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index de15b545c1..09c4d6de5c 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -37,6 +37,12 @@ pub async fn wait(watcher: &mut Watcher) -> Result> { ProcedureState::Retrying { error } => { debug!("retrying, source: {}", error) } + ProcedureState::RollingBack { error } => { + debug!("rolling back, source: {:?}", error) + } + ProcedureState::PrepareRollback { error } => { + debug!("commit rollback, source: {}", error) + } } } }