feat: Add an error variant RetryLater (#1058)

* feat: support retry error

* fix: ci

* fix: ci

* fix: fmt

* feat: add convert procedure error

* Docs : add rustdoc

* fix: cr

* fix: cr

* fix: rm unless code
This commit is contained in:
Xieqijun
2023-02-27 17:19:37 +08:00
committed by GitHub
parent 4b58a8a18d
commit 0b3f955ca7
6 changed files with 115 additions and 11 deletions

View File

@@ -86,6 +86,34 @@ impl StatusCode {
pub fn is_success(code: u32) -> bool {
Self::Success as u32 == code
}
pub fn is_retryable(&self) -> bool {
match self {
StatusCode::StorageUnavailable
| StatusCode::RuntimeResourcesExhausted
| StatusCode::Internal => true,
StatusCode::Success
| StatusCode::Unknown
| StatusCode::Unsupported
| StatusCode::Unexpected
| StatusCode::InvalidArguments
| StatusCode::InvalidSyntax
| StatusCode::PlanQuery
| StatusCode::EngineExecuteQuery
| StatusCode::TableAlreadyExists
| StatusCode::TableNotFound
| StatusCode::TableColumnNotFound
| StatusCode::TableColumnExists
| StatusCode::DatabaseNotFound
| StatusCode::UserNotFound
| StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::AccessDenied => false,
}
}
}
impl fmt::Display for StatusCode {

View File

@@ -75,6 +75,12 @@ pub enum Error {
source: serde_json::Error,
backtrace: Backtrace,
},
#[snafu(display("Procedure exec failed, source: {}", source))]
RetryLater {
#[snafu(backtrace)]
source: BoxedError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -88,7 +94,8 @@ impl ErrorExt for Error {
| Error::DeleteState { .. }
| Error::ListState { .. }
| Error::ReadState { .. }
| Error::FromJson { .. } => StatusCode::Internal,
| Error::FromJson { .. }
| Error::RetryLater { .. } => StatusCode::Internal,
Error::LoaderConflict { .. } | Error::DuplicateProcedure { .. } => {
StatusCode::InvalidArguments
}
@@ -111,4 +118,26 @@ impl Error {
source: BoxedError::new(err),
}
}
/// Creates a new [Error::RetryLater] error from source `err`.
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
Error::RetryLater {
source: BoxedError::new(err),
}
}
/// Determine whether it is a retry later type through [StatusCode]
pub fn is_retry_later(&self) -> bool {
matches!(self, Error::RetryLater { .. })
}
/// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
/// to its [StatusCode].
pub fn from_error_ext<E: ErrorExt + Send + Sync + 'static>(err: E) -> Self {
if err.status_code().is_retryable() {
Error::retry_later(err)
} else {
Error::external(err)
}
}
}

View File

@@ -43,6 +43,10 @@ impl ExecResult {
matches!(self, ExecResult::Done)
}
fn is_retry_later(&self) -> bool {
matches!(self, ExecResult::RetryLater)
}
fn is_failed(&self) -> bool {
matches!(self, ExecResult::Failed(_))
}
@@ -208,11 +212,16 @@ impl Runner {
Err(e) => {
logging::error!(
e;
"Failed to execute procedure {}-{}",
"Failed to execute procedure {}-{}, retry: {}",
self.procedure.type_name(),
self.meta.id
self.meta.id,
e.is_retry_later(),
);
if e.is_retry_later() {
return ExecResult::RetryLater;
}
self.meta.set_state(ProcedureState::Failed);
// Write rollback key so we can skip this procedure while recovering procedures.
@@ -290,7 +299,7 @@ impl Runner {
self.procedure.type_name(),
self.meta.id,
subprocedure.procedure.type_name(),
subprocedure.id
subprocedure.id,
);
self.submit_subprocedure(subprocedure.id, subprocedure.procedure);
@@ -372,7 +381,7 @@ impl Runner {
logging::info!(
"Procedure {}-{} done",
self.procedure.type_name(),
self.meta.id
self.meta.id,
);
// Mark the state of this procedure to done.
@@ -701,6 +710,45 @@ mod tests {
check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await;
}
#[tokio::test]
async fn test_execute_on_retry_later_error() {
let mut times = 0;
let exec_fn = move |_| {
times += 1;
async move {
if times == 1 {
Err(Error::retry_later(MockError::new(StatusCode::Unexpected)))
} else {
Ok(Status::Done)
}
}
.boxed()
};
let retry_later = ProcedureAdapter {
data: "retry_later".to_string(),
lock_key: LockKey::single("catalog.schema.table"),
exec_fn,
};
let dir = TempDir::new("retry_later").unwrap();
let meta = retry_later.new_meta(ROOT_ID);
let ctx = context_without_provider(meta.id);
let object_store = test_util::new_object_store(&dir);
let procedure_store = ProcedureStore::from(object_store.clone());
let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store);
let res = runner.execute_once(&ctx).await;
assert!(res.is_retry_later(), "{res:?}");
assert_eq!(ProcedureState::Running, meta.state());
let res = runner.execute_once(&ctx).await;
assert!(res.is_done(), "{res:?}");
assert_eq!(ProcedureState::Done, meta.state());
check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await;
}
#[tokio::test]
async fn test_child_error() {
let mut times = 0;
@@ -733,7 +781,7 @@ mod tests {
let state = ctx.provider.procedure_state(child_id).await.unwrap();
if state == Some(ProcedureState::Failed) {
// The parent procedure to abort itself if child procedure is failed.
Err(Error::external(PlainError::new(
Err(Error::from_error_ext(PlainError::new(
"subprocedure failed".to_string(),
StatusCode::Unexpected,
)))

View File

@@ -187,13 +187,12 @@ impl<S: StorageEngine> CreateMitoTable<S> {
}
let region_name = engine::region_name(self.data.request.id, *number);
// TODO(yingwen): Most error is recoverable.
if let Some(region) = self
.engine_inner
.storage_engine
.open_region(&engine_ctx, &region_name, &open_opts)
.await
.map_err(Error::external)?
.map_err(Error::from_error_ext)?
{
// Region already exists.
self.regions.insert(*number, region);
@@ -218,7 +217,7 @@ impl<S: StorageEngine> CreateMitoTable<S> {
.storage_engine
.create_region(&engine_ctx, region_desc, &create_opts)
.await
.map_err(Error::external)?;
.map_err(Error::from_error_ext)?;
self.regions.insert(*number, region);
}

View File

@@ -232,7 +232,7 @@ impl ErrorExt for Error {
impl From<Error> for common_procedure::Error {
fn from(e: Error) -> common_procedure::Error {
common_procedure::Error::external(e)
common_procedure::Error::from_error_ext(e)
}
}

View File

@@ -84,6 +84,6 @@ impl ErrorExt for Error {
impl From<Error> for common_procedure::Error {
fn from(e: Error) -> common_procedure::Error {
common_procedure::Error::external(e)
common_procedure::Error::from_error_ext(e)
}
}