diff --git a/Cargo.lock b/Cargo.lock index 4e07c2a9ee..8445d67bd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1592,6 +1592,7 @@ name = "common-procedure" version = "0.1.1" dependencies = [ "async-trait", + "backon 0.4.0", "common-error", "common-runtime", "common-telemetry", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index f7e7193d87..c2f2ccca90 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -44,5 +44,7 @@ max_purge_tasks = 32 # Procedure storage options, see `standalone.example.toml`. # [procedure.store] -# type = 'File' -# data_dir = '/tmp/greptimedb/procedure/' +# type = "File" +# data_dir = "/tmp/greptimedb/procedure/" +# max_retry_times = 3 +# retry_delay = "500ms" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index e05190dc91..c9a0d28f9f 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -114,3 +114,7 @@ max_purge_tasks = 32 # type = "File" # # Procedure data path. # data_dir = "/tmp/greptimedb/procedure/" +# # Procedure max retry time. +# max_retry_times = 3 +# # Initial retry delay of procedures, increases exponentially +# retry_delay = "500ms" diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 1aa095e21b..89472690c7 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -150,7 +150,6 @@ impl TryFrom for DatanodeOptions { if let Some(wal_dir) = cmd.wal_dir { opts.wal.dir = wal_dir; } - if let Some(procedure_dir) = cmd.procedure_dir { opts.procedure = Some(ProcedureConfig::from_file_path(procedure_dir)); } diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index 2d6ce54118..f1e5f66863 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -14,6 +14,7 @@ object-store = { path = "../../object-store" } serde.workspace = true serde_json = "1.0" smallvec = "1" +backon = "0.4.0" snafu.workspace = true tokio.workspace = true uuid.workspace = true diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 44d0da8b79..eca8c9f938 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -97,6 +97,16 @@ pub enum Error { source: Arc, backtrace: Backtrace, }, + + #[snafu(display( + "Procedure retry exceeded max times, procedure_id: {}, source:{}", + procedure_id, + source + ))] + RetryTimesExceeded { + source: Arc, + procedure_id: ProcedureId, + }, } pub type Result = std::result::Result; @@ -111,6 +121,7 @@ impl ErrorExt for Error { | Error::ListState { .. } | Error::ReadState { .. } | Error::FromJson { .. } + | Error::RetryTimesExceeded { .. } | Error::RetryLater { .. } | Error::WaitWatcher { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 36a97b6e3e..05150df4b8 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -17,8 +17,10 @@ mod runner; use std::collections::{HashMap, VecDeque}; use std::sync::{Arc, Mutex, RwLock}; +use std::time::Duration; use async_trait::async_trait; +use backon::ExponentialBuilder; use common_telemetry::logging; use object_store::ObjectStore; use snafu::ensure; @@ -291,12 +293,16 @@ impl ManagerContext { pub struct ManagerConfig { /// Object store pub object_store: ObjectStore, + pub max_retry_times: usize, + pub retry_delay: Duration, } /// A [ProcedureManager] that maintains procedure states locally. pub struct LocalManager { manager_ctx: Arc, state_store: StateStoreRef, + max_retry_times: usize, + retry_delay: Duration, } impl LocalManager { @@ -305,6 +311,8 @@ impl LocalManager { LocalManager { manager_ctx: Arc::new(ManagerContext::new()), state_store: Arc::new(ObjectStateStore::new(config.object_store)), + max_retry_times: config.max_retry_times, + retry_delay: config.retry_delay, } } @@ -321,7 +329,11 @@ impl LocalManager { procedure, manager_ctx: self.manager_ctx.clone(), step, + exponential_builder: ExponentialBuilder::default() + .with_min_delay(self.retry_delay) + .with_max_times(self.max_retry_times), store: ProcedureStore::new(self.state_store.clone()), + rolling_back: false, }; let watcher = meta.state_receiver.clone(); @@ -543,6 +555,8 @@ mod tests { let dir = create_temp_dir("register"); let config = ManagerConfig { object_store: test_util::new_object_store(&dir), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), }; let manager = LocalManager::new(config); @@ -562,6 +576,8 @@ mod tests { let object_store = test_util::new_object_store(&dir); let config = ManagerConfig { object_store: object_store.clone(), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), }; let manager = LocalManager::new(config); @@ -606,6 +622,8 @@ mod tests { let dir = create_temp_dir("submit"); let config = ManagerConfig { object_store: test_util::new_object_store(&dir), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), }; let manager = LocalManager::new(config); @@ -652,6 +670,8 @@ mod tests { let dir = create_temp_dir("on_err"); let config = ManagerConfig { object_store: test_util::new_object_store(&dir), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), }; let manager = LocalManager::new(config); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 1716f20d74..d665c5b5e3 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -15,15 +15,15 @@ use std::sync::Arc; use std::time::Duration; +use backon::{BackoffBuilder, ExponentialBuilder}; use common_telemetry::logging; use tokio::time; use crate::error::{ProcedurePanicSnafu, Result}; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::store::ProcedureStore; -use crate::{BoxedProcedure, Context, ProcedureId, ProcedureState, ProcedureWithId, Status}; - -const ERR_WAIT_DURATION: Duration = Duration::from_secs(30); +use crate::ProcedureState::Retrying; +use crate::{BoxedProcedure, Context, Error, ProcedureId, ProcedureState, ProcedureWithId, Status}; #[derive(Debug)] enum ExecResult { @@ -108,7 +108,9 @@ pub(crate) struct Runner { pub(crate) procedure: BoxedProcedure, pub(crate) manager_ctx: Arc, pub(crate) step: u32, + pub(crate) exponential_builder: ExponentialBuilder, pub(crate) store: ProcedureStore, + pub(crate) rolling_back: bool, } impl Runner { @@ -164,18 +166,56 @@ impl Runner { provider: self.manager_ctx.clone(), }; + self.rolling_back = false; + self.execute_once_with_retry(&ctx).await; + } + + async fn execute_once_with_retry(&mut self, ctx: &Context) { + let mut retry = self.exponential_builder.build(); + let mut retry_times = 0; loop { - match self.execute_once(&ctx).await { - ExecResult::Continue => (), + match self.execute_once(ctx).await { ExecResult::Done | ExecResult::Failed => return, + ExecResult::Continue => (), ExecResult::RetryLater => { - self.wait_on_err().await; + retry_times += 1; + if let Some(d) = retry.next() { + self.wait_on_err(d, retry_times).await; + } else { + assert!(self.meta.state().is_retrying()); + if let Retrying { error } = self.meta.state() { + self.meta.set_state(ProcedureState::failed(Arc::new( + Error::RetryTimesExceeded { + source: error, + procedure_id: self.meta.id, + }, + ))) + } + return; + } } } } } + async fn rollback(&mut self, error: Arc) -> ExecResult { + if let Err(e) = self.rollback_procedure().await { + self.rolling_back = true; + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); + return ExecResult::RetryLater; + } + self.meta.set_state(ProcedureState::failed(error)); + ExecResult::Failed + } + async fn execute_once(&mut self, ctx: &Context) -> ExecResult { + // if rolling_back, there is no need to execute again. + if self.rolling_back { + // We can definitely get the previous error here. + let state = self.meta.state(); + let err = state.error().unwrap(); + return self.rollback(err.clone()).await; + } match self.procedure.execute(ctx).await { Ok(status) => { logging::debug!( @@ -186,8 +226,11 @@ impl Runner { status.need_persist(), ); - if status.need_persist() && self.persist_procedure().await.is_err() { - return ExecResult::RetryLater; + if status.need_persist() { + if let Err(err) = self.persist_procedure().await { + self.meta.set_state(ProcedureState::retrying(Arc::new(err))); + return ExecResult::RetryLater; + } } match status { @@ -196,7 +239,8 @@ impl Runner { self.on_suspended(subprocedures).await; } Status::Done => { - if self.commit_procedure().await.is_err() { + if let Err(e) = self.commit_procedure().await { + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); return ExecResult::RetryLater; } @@ -217,17 +261,12 @@ impl Runner { ); if e.is_retry_later() { + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); return ExecResult::RetryLater; } - self.meta.set_state(ProcedureState::failed(Arc::new(e))); - // Write rollback key so we can skip this procedure while recovering procedures. - if self.rollback_procedure().await.is_err() { - return ExecResult::RetryLater; - } - - ExecResult::Failed + self.rollback(Arc::new(e)).await } } } @@ -261,7 +300,9 @@ impl Runner { procedure, manager_ctx: self.manager_ctx.clone(), step, + exponential_builder: self.exponential_builder.clone(), store: self.store.clone(), + rolling_back: false, }; // Insert the procedure. We already check the procedure existence before inserting @@ -285,8 +326,16 @@ impl Runner { }); } - async fn wait_on_err(&self) { - time::sleep(ERR_WAIT_DURATION).await; + /// Extend the retry time to wait for the next retry. + async fn wait_on_err(&self, d: Duration, i: u64) { + logging::info!( + "Procedure {}-{} retry for the {} times after {} millis", + self.procedure.type_name(), + self.meta.id, + i, + d.as_millis(), + ); + time::sleep(d).await; } async fn on_suspended(&self, subprocedures: Vec) { @@ -416,7 +465,9 @@ mod tests { procedure, manager_ctx: Arc::new(ManagerContext::new()), step: 0, + exponential_builder: ExponentialBuilder::default(), store, + rolling_back: false, } } @@ -744,7 +795,7 @@ mod tests { let res = runner.execute_once(&ctx).await; assert!(res.is_retry_later(), "{res:?}"); - assert!(meta.state().is_running()); + assert!(meta.state().is_retrying()); let res = runner.execute_once(&ctx).await; assert!(res.is_done(), "{res:?}"); @@ -752,6 +803,36 @@ mod tests { check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await; } + #[tokio::test] + async fn test_execute_exceed_max_retry_later() { + let exec_fn = + |_| async { Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) }.boxed(); + + let exceed_max_retry_later = ProcedureAdapter { + data: "exceed_max_retry_later".to_string(), + lock_key: LockKey::single("catalog.schema.table"), + exec_fn, + }; + + let dir = create_temp_dir("exceed_max_retry_later"); + let meta = exceed_max_retry_later.new_meta(ROOT_ID); + let object_store = test_util::new_object_store(&dir); + let procedure_store = ProcedureStore::from(object_store.clone()); + let mut runner = new_runner( + meta.clone(), + Box::new(exceed_max_retry_later), + procedure_store, + ); + runner.exponential_builder = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(1)) + .with_max_times(3); + + // Run the runner and execute the procedure. + runner.execute_procedure_in_loop().await; + let err = meta.state().error().unwrap().to_string(); + assert!(err.contains("Procedure retry exceeded max times")); + } + #[tokio::test] async fn test_child_error() { let mut times = 0; @@ -819,7 +900,7 @@ mod tests { // Replace the manager ctx. runner.manager_ctx = manager_ctx; - // Run the runer and execute the procedure. + // Run the runner and execute the procedure. runner.run().await; let err = meta.state().error().unwrap().to_string(); assert!(err.contains("subprocedure failed"), "{err}"); diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index dce404eda6..bba0f1ba32 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -206,6 +206,8 @@ pub enum ProcedureState { Running, /// The procedure is finished. Done, + /// The procedure is failed and can be retried. + Retrying { error: Arc }, /// The procedure is failed and cannot proceed anymore. Failed { error: Arc }, } @@ -216,6 +218,11 @@ impl ProcedureState { ProcedureState::Failed { error } } + /// Returns a [ProcedureState] with retrying state. + pub fn retrying(error: Arc) -> ProcedureState { + ProcedureState::Retrying { error } + } + /// Returns true if the procedure state is running. pub fn is_running(&self) -> bool { matches!(self, ProcedureState::Running) @@ -231,10 +238,16 @@ impl ProcedureState { matches!(self, ProcedureState::Failed { .. }) } + /// Returns true if the procedure state is retrying. + pub fn is_retrying(&self) -> bool { + matches!(self, ProcedureState::Retrying { .. }) + } + /// Returns the error. pub fn error(&self) -> Option<&Arc> { match self { ProcedureState::Failed { error } => Some(error), + ProcedureState::Retrying { error } => Some(error), _ => None, } } diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index ea1b763268..13d32f73b3 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -33,6 +33,9 @@ pub async fn wait(watcher: &mut Watcher) -> Result<()> { ProcedureState::Failed { error } => { return Err(error.clone()).context(ProcedureExecSnafu); } + ProcedureState::Retrying { error } => { + return Err(error.clone()).context(ProcedureExecSnafu); + } } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 729a682d4e..6ba0858957 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -149,11 +149,22 @@ impl From<&DatanodeOptions> for StorageEngineConfig { pub struct ProcedureConfig { /// Storage config for procedure manager. pub store: ObjectStoreConfig, + /// Max retry times of procedure. + pub max_retry_times: usize, + /// Initial retry delay of procedures, increases exponentially. + #[serde(with = "humantime_serde")] + pub retry_delay: Duration, } impl Default for ProcedureConfig { fn default() -> ProcedureConfig { - ProcedureConfig::from_file_path("/tmp/greptimedb/procedure/".to_string()) + ProcedureConfig { + store: ObjectStoreConfig::File(FileConfig { + data_dir: "/tmp/greptimedb/procedure/".to_string(), + }), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), + } } } @@ -161,6 +172,7 @@ impl ProcedureConfig { pub fn from_file_path(path: String) -> ProcedureConfig { ProcedureConfig { store: ObjectStoreConfig::File(FileConfig { data_dir: path }), + ..Default::default() } } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index ed2faa2f39..b69b7a8ffd 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -460,7 +460,11 @@ pub(crate) async fn create_procedure_manager( ); let object_store = new_object_store(&procedure_config.store).await?; - let manager_config = ManagerConfig { object_store }; + let manager_config = ManagerConfig { + object_store, + max_retry_times: procedure_config.max_retry_times, + retry_delay: procedure_config.retry_delay, + }; Ok(Some(Arc::new(LocalManager::new(manager_config)))) } diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index da5f0aea9f..284dd9581b 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_query::Output; @@ -64,6 +65,8 @@ impl MockInstance { store: ObjectStoreConfig::File(FileConfig { data_dir: procedure_dir.path().to_str().unwrap().to_string(), }), + max_retry_times: 3, + retry_delay: Duration::from_millis(500), }); let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); diff --git a/src/table-procedure/src/create.rs b/src/table-procedure/src/create.rs index 357337ed3f..5a00cec3e0 100644 --- a/src/table-procedure/src/create.rs +++ b/src/table-procedure/src/create.rs @@ -197,7 +197,7 @@ impl CreateTableProcedure { }; match sub_state { - ProcedureState::Running => Ok(Status::Suspended { + ProcedureState::Running | ProcedureState::Retrying { .. } => Ok(Status::Suspended { subprocedures: Vec::new(), persist: false, }), diff --git a/src/table-procedure/src/test_util.rs b/src/table-procedure/src/test_util.rs index c64baab48a..561e186cd9 100644 --- a/src/table-procedure/src/test_util.rs +++ b/src/table-procedure/src/test_util.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use catalog::local::MemoryCatalogManager; use catalog::CatalogManagerRef; @@ -59,7 +60,11 @@ impl TestEnv { let accessor = Fs::default().root(&procedure_dir).build().unwrap(); let object_store = ObjectStore::new(accessor).finish(); - let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { object_store })); + let procedure_manager = Arc::new(LocalManager::new(ManagerConfig { + object_store, + max_retry_times: 3, + retry_delay: Duration::from_secs(500), + })); let catalog_manager = Arc::new(MemoryCatalogManager::default());