diff --git a/src/common/error/src/status_code.rs b/src/common/error/src/status_code.rs index d58c1696c2..baf366ae9f 100644 --- a/src/common/error/src/status_code.rs +++ b/src/common/error/src/status_code.rs @@ -86,6 +86,34 @@ impl StatusCode { pub fn is_success(code: u32) -> bool { Self::Success as u32 == code } + + pub fn is_retryable(&self) -> bool { + match self { + StatusCode::StorageUnavailable + | StatusCode::RuntimeResourcesExhausted + | StatusCode::Internal => true, + + StatusCode::Success + | StatusCode::Unknown + | StatusCode::Unsupported + | StatusCode::Unexpected + | StatusCode::InvalidArguments + | StatusCode::InvalidSyntax + | StatusCode::PlanQuery + | StatusCode::EngineExecuteQuery + | StatusCode::TableAlreadyExists + | StatusCode::TableNotFound + | StatusCode::TableColumnNotFound + | StatusCode::TableColumnExists + | StatusCode::DatabaseNotFound + | StatusCode::UserNotFound + | StatusCode::UnsupportedPasswordType + | StatusCode::UserPasswordMismatch + | StatusCode::AuthHeaderNotFound + | StatusCode::InvalidAuthHeader + | StatusCode::AccessDenied => false, + } + } } impl fmt::Display for StatusCode { diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index c1b8ad2ddd..8b048a3f3c 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -75,6 +75,12 @@ pub enum Error { source: serde_json::Error, backtrace: Backtrace, }, + + #[snafu(display("Procedure exec failed, source: {}", source))] + RetryLater { + #[snafu(backtrace)] + source: BoxedError, + }, } pub type Result = std::result::Result; @@ -88,7 +94,8 @@ impl ErrorExt for Error { | Error::DeleteState { .. } | Error::ListState { .. } | Error::ReadState { .. } - | Error::FromJson { .. } => StatusCode::Internal, + | Error::FromJson { .. } + | Error::RetryLater { .. } => StatusCode::Internal, Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => { StatusCode::InvalidArguments } @@ -111,4 +118,26 @@ impl Error { source: BoxedError::new(err), } } + + /// Creates a new [Error::RetryLater] error from source `err`. + pub fn retry_later(err: E) -> Error { + Error::RetryLater { + source: BoxedError::new(err), + } + } + + /// Determine whether it is a retry later type through [StatusCode] + pub fn is_retry_later(&self) -> bool { + matches!(self, Error::RetryLater { .. }) + } + + /// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according + /// to its [StatusCode]. + pub fn from_error_ext(err: E) -> Self { + if err.status_code().is_retryable() { + Error::retry_later(err) + } else { + Error::external(err) + } + } } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index a5c19b0dc3..233b2e665a 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -43,6 +43,10 @@ impl ExecResult { matches!(self, ExecResult::Done) } + fn is_retry_later(&self) -> bool { + matches!(self, ExecResult::RetryLater) + } + fn is_failed(&self) -> bool { matches!(self, ExecResult::Failed(_)) } @@ -208,11 +212,16 @@ impl Runner { Err(e) => { logging::error!( e; - "Failed to execute procedure {}-{}", + "Failed to execute procedure {}-{}, retry: {}", self.procedure.type_name(), - self.meta.id + self.meta.id, + e.is_retry_later(), ); + if e.is_retry_later() { + return ExecResult::RetryLater; + } + self.meta.set_state(ProcedureState::Failed); // Write rollback key so we can skip this procedure while recovering procedures. @@ -290,7 +299,7 @@ impl Runner { self.procedure.type_name(), self.meta.id, subprocedure.procedure.type_name(), - subprocedure.id + subprocedure.id, ); self.submit_subprocedure(subprocedure.id, subprocedure.procedure); @@ -372,7 +381,7 @@ impl Runner { logging::info!( "Procedure {}-{} done", self.procedure.type_name(), - self.meta.id + self.meta.id, ); // Mark the state of this procedure to done. @@ -701,6 +710,45 @@ mod tests { check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await; } + #[tokio::test] + async fn test_execute_on_retry_later_error() { + let mut times = 0; + + let exec_fn = move |_| { + times += 1; + async move { + if times == 1 { + Err(Error::retry_later(MockError::new(StatusCode::Unexpected))) + } else { + Ok(Status::Done) + } + } + .boxed() + }; + + let retry_later = ProcedureAdapter { + data: "retry_later".to_string(), + lock_key: LockKey::single("catalog.schema.table"), + exec_fn, + }; + + let dir = TempDir::new("retry_later").unwrap(); + let meta = retry_later.new_meta(ROOT_ID); + let ctx = context_without_provider(meta.id); + let object_store = test_util::new_object_store(&dir); + let procedure_store = ProcedureStore::from(object_store.clone()); + let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_retry_later(), "{res:?}"); + assert_eq!(ProcedureState::Running, meta.state()); + + let res = runner.execute_once(&ctx).await; + assert!(res.is_done(), "{res:?}"); + assert_eq!(ProcedureState::Done, meta.state()); + check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await; + } + #[tokio::test] async fn test_child_error() { let mut times = 0; @@ -733,7 +781,7 @@ mod tests { let state = ctx.provider.procedure_state(child_id).await.unwrap(); if state == Some(ProcedureState::Failed) { // The parent procedure to abort itself if child procedure is failed. - Err(Error::external(PlainError::new( + Err(Error::from_error_ext(PlainError::new( "subprocedure failed".to_string(), StatusCode::Unexpected, ))) diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index c249ff619a..35bce2e3dc 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -187,13 +187,12 @@ impl CreateMitoTable { } let region_name = engine::region_name(self.data.request.id, *number); - // TODO(yingwen): Most error is recoverable. if let Some(region) = self .engine_inner .storage_engine .open_region(&engine_ctx, ®ion_name, &open_opts) .await - .map_err(Error::external)? + .map_err(Error::from_error_ext)? { // Region already exists. self.regions.insert(*number, region); @@ -218,7 +217,7 @@ impl CreateMitoTable { .storage_engine .create_region(&engine_ctx, region_desc, &create_opts) .await - .map_err(Error::external)?; + .map_err(Error::from_error_ext)?; self.regions.insert(*number, region); } diff --git a/src/mito/src/error.rs b/src/mito/src/error.rs index 56409c2901..55446db42a 100644 --- a/src/mito/src/error.rs +++ b/src/mito/src/error.rs @@ -232,7 +232,7 @@ impl ErrorExt for Error { impl From for common_procedure::Error { fn from(e: Error) -> common_procedure::Error { - common_procedure::Error::external(e) + common_procedure::Error::from_error_ext(e) } } diff --git a/src/table-procedure/src/error.rs b/src/table-procedure/src/error.rs index 582467b011..371c81fa9c 100644 --- a/src/table-procedure/src/error.rs +++ b/src/table-procedure/src/error.rs @@ -84,6 +84,6 @@ impl ErrorExt for Error { impl From for common_procedure::Error { fn from(e: Error) -> common_procedure::Error { - common_procedure::Error::external(e) + common_procedure::Error::from_error_ext(e) } }