mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
fix: remove poison key before retrying procedure on retryable errors (#6189)
* fix(meta): remove poison key before retrying procedure on retriable errors * refactor: enhance error handling in DDL procedures
This commit is contained in:
@@ -55,6 +55,7 @@ impl Datanode for RegionRequester {
|
||||
if err.should_retry() {
|
||||
meta_error::Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
clean_poisons: false,
|
||||
}
|
||||
} else {
|
||||
meta_error::Error::External {
|
||||
|
||||
@@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt};
|
||||
use strum::AsRefStr;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::handle_retry_error;
|
||||
use crate::ddl::utils::map_to_procedure_error;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{Result, SchemaNotFoundSnafu};
|
||||
use crate::instruction::CacheIdent;
|
||||
@@ -148,7 +148,7 @@ impl Procedure for AlterDatabaseProcedure {
|
||||
AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await,
|
||||
AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -32,9 +32,11 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY;
|
||||
use strum::AsRefStr;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions};
|
||||
use crate::ddl::utils::{
|
||||
add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
|
||||
};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result};
|
||||
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::table_route::PhysicalTableRouteValue;
|
||||
use crate::key::DeserializedValueWithBytes;
|
||||
@@ -217,14 +219,6 @@ impl Procedure for AlterLogicalTablesProcedure {
|
||||
}
|
||||
|
||||
async fn execute(&mut self, _ctx: &Context) -> ProcedureResult<Status> {
|
||||
let error_handler = |e: Error| {
|
||||
if e.is_retry_later() {
|
||||
common_procedure::Error::retry_later(e)
|
||||
} else {
|
||||
common_procedure::Error::external(e)
|
||||
}
|
||||
};
|
||||
|
||||
let state = &self.data.state;
|
||||
|
||||
let step = state.as_ref();
|
||||
@@ -241,7 +235,7 @@ impl Procedure for AlterLogicalTablesProcedure {
|
||||
AlterTablesState::UpdateMetadata => self.on_update_metadata().await,
|
||||
AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await,
|
||||
}
|
||||
.map_err(error_handler)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -40,10 +40,11 @@ use table::table_reference::TableReference;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::{
|
||||
add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults,
|
||||
add_peer_context_if_needed, handle_multiple_results, map_to_procedure_error,
|
||||
sync_follower_regions, MultipleResults,
|
||||
};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result};
|
||||
use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu};
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
use crate::key::{DeserializedValueWithBytes, RegionDistribution};
|
||||
@@ -195,7 +196,10 @@ impl AlterTableProcedure {
|
||||
}
|
||||
MultipleResults::AllRetryable(error) => {
|
||||
// Just returns the error, and wait for the next try.
|
||||
Err(error)
|
||||
let err = BoxedError::new(error);
|
||||
Err(err).context(RetryLaterSnafu {
|
||||
clean_poisons: true,
|
||||
})
|
||||
}
|
||||
MultipleResults::Ok(results) => {
|
||||
self.submit_sync_region_requests(results, &physical_table_route.region_routes)
|
||||
@@ -323,16 +327,6 @@ impl Procedure for AlterTableProcedure {
|
||||
}
|
||||
|
||||
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
|
||||
let error_handler = |e: Error| {
|
||||
if e.is_retry_later() {
|
||||
ProcedureError::retry_later(e)
|
||||
} else if e.need_clean_poisons() {
|
||||
ProcedureError::external_and_clean_poisons(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
}
|
||||
};
|
||||
|
||||
let state = &self.data.state;
|
||||
|
||||
let step = state.as_ref();
|
||||
@@ -350,7 +344,7 @@ impl Procedure for AlterTableProcedure {
|
||||
AlterTableState::UpdateMetadata => self.on_update_metadata().await,
|
||||
AlterTableState::InvalidateTableCache => self.on_broadcast().await,
|
||||
}
|
||||
.map_err(error_handler)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -22,7 +22,7 @@ use serde_with::{serde_as, DefaultOnNull};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use strum::AsRefStr;
|
||||
|
||||
use crate::ddl::utils::handle_retry_error;
|
||||
use crate::ddl::utils::map_to_procedure_error;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::schema_name::{SchemaNameKey, SchemaNameValue};
|
||||
@@ -115,7 +115,7 @@ impl Procedure for CreateDatabaseProcedure {
|
||||
CreateDatabaseState::Prepare => self.on_prepare().await,
|
||||
CreateDatabaseState::CreateMetadata => self.on_create_metadata().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -36,7 +36,7 @@ use strum::AsRefStr;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result, UnexpectedSnafu};
|
||||
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
|
||||
@@ -304,7 +304,7 @@ impl Procedure for CreateFlowProcedure {
|
||||
CreateFlowState::CreateMetadata => self.on_create_metadata().await,
|
||||
CreateFlowState::InvalidateFlowCache => self.on_broadcast().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -33,7 +33,9 @@ use store_api::storage::{RegionId, RegionNumber};
|
||||
use strum::AsRefStr;
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions};
|
||||
use crate::ddl::utils::{
|
||||
add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions,
|
||||
};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
@@ -238,7 +240,7 @@ impl Procedure for CreateLogicalTablesProcedure {
|
||||
CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
|
||||
CreateTablesState::CreateMetadata => self.on_create_metadata().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -34,7 +34,7 @@ use table::table_reference::TableReference;
|
||||
|
||||
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
|
||||
use crate::ddl::utils::{
|
||||
add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error,
|
||||
add_peer_context_if_needed, convert_region_routes_to_detecting_regions, map_to_procedure_error,
|
||||
region_storage_path,
|
||||
};
|
||||
use crate::ddl::{DdlContext, TableMetadata};
|
||||
@@ -319,7 +319,7 @@ impl Procedure for CreateTableProcedure {
|
||||
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
|
||||
CreateTableState::CreateMetadata => self.on_create_metadata().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -23,7 +23,7 @@ use table::metadata::{RawTableInfo, TableId, TableType};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::handle_retry_error;
|
||||
use crate::ddl::utils::map_to_procedure_error;
|
||||
use crate::ddl::{DdlContext, TableMetadata};
|
||||
use crate::error::{self, Result};
|
||||
use crate::instruction::CacheIdent;
|
||||
@@ -249,7 +249,7 @@ impl Procedure for CreateViewProcedure {
|
||||
CreateViewState::Prepare => self.on_prepare().await,
|
||||
CreateViewState::CreateMetadata => self.on_create_metadata(ctx).await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -21,7 +21,7 @@ use std::any::Any;
|
||||
use std::fmt::Debug;
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
|
||||
use common_procedure::{
|
||||
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
|
||||
};
|
||||
@@ -31,6 +31,7 @@ use snafu::ResultExt;
|
||||
use tonic::async_trait;
|
||||
|
||||
use self::start::DropDatabaseStart;
|
||||
use crate::ddl::utils::map_to_procedure_error;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::Result;
|
||||
use crate::key::table_name::TableNameValue;
|
||||
@@ -141,13 +142,7 @@ impl Procedure for DropDatabaseProcedure {
|
||||
let (next, status) = state
|
||||
.next(&self.runtime_context, &mut self.context)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
if e.is_retry_later() {
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
}
|
||||
})?;
|
||||
.map_err(map_to_procedure_error)?;
|
||||
|
||||
*state = next;
|
||||
Ok(status)
|
||||
|
||||
@@ -323,6 +323,7 @@ mod tests {
|
||||
}
|
||||
.build(),
|
||||
),
|
||||
clean_poisons: false,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt};
|
||||
use strum::AsRefStr;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result};
|
||||
use crate::flow_name::FlowName;
|
||||
@@ -201,7 +201,7 @@ impl Procedure for DropFlowProcedure {
|
||||
DropFlowState::InvalidateFlowCache => self.on_broadcast().await,
|
||||
DropFlowState::DropFlows => self.on_flownode_drop_flows().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -35,7 +35,7 @@ use table::metadata::TableId;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use self::executor::DropTableExecutor;
|
||||
use crate::ddl::utils::handle_retry_error;
|
||||
use crate::ddl::utils::map_to_procedure_error;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result};
|
||||
use crate::key::table_route::TableRouteValue;
|
||||
@@ -221,7 +221,7 @@ impl Procedure for DropTableProcedure {
|
||||
DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
|
||||
DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -25,7 +25,7 @@ use table::metadata::{RawTableInfo, TableId, TableType};
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::cache_invalidator::Context;
|
||||
use crate::ddl::utils::handle_retry_error;
|
||||
use crate::ddl::utils::map_to_procedure_error;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result};
|
||||
use crate::instruction::CacheIdent;
|
||||
@@ -191,7 +191,7 @@ impl Procedure for DropViewProcedure {
|
||||
DropViewState::DeleteMetadata => self.on_delete_metadata().await,
|
||||
DropViewState::InvalidateViewCache => self.on_broadcast().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -105,6 +105,7 @@ impl MockDatanodeHandler for RetryErrorDatanodeHandler {
|
||||
}
|
||||
.build(),
|
||||
),
|
||||
clean_poisons: false,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -218,6 +219,7 @@ impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
|
||||
}
|
||||
.build(),
|
||||
),
|
||||
clean_poisons: false,
|
||||
})
|
||||
} else {
|
||||
error::UnexpectedSnafu {
|
||||
@@ -252,6 +254,7 @@ impl MockDatanodeHandler for AllFailureDatanodeHandler {
|
||||
}
|
||||
.build(),
|
||||
),
|
||||
clean_poisons: false,
|
||||
})
|
||||
} else {
|
||||
error::UnexpectedSnafu {
|
||||
|
||||
@@ -575,6 +575,7 @@ async fn test_on_submit_alter_request_with_partial_success_retryable() {
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(result.is_retry_later());
|
||||
assert!(!result.need_clean_poisons());
|
||||
|
||||
// Submits again
|
||||
let result = procedure
|
||||
@@ -582,6 +583,7 @@ async fn test_on_submit_alter_request_with_partial_success_retryable() {
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(result.is_retry_later());
|
||||
assert!(!result.need_clean_poisons());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -618,12 +620,14 @@ async fn test_on_submit_alter_request_with_all_failure_retrybale() {
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(err.is_retry_later());
|
||||
assert!(err.need_clean_poisons());
|
||||
// submits again
|
||||
let err = procedure
|
||||
.submit_alter_region_requests(procedure_id, provider.as_ref())
|
||||
.await
|
||||
.unwrap_err();
|
||||
assert!(err.is_retry_later());
|
||||
assert!(err.need_clean_poisons());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -31,7 +31,7 @@ use table::metadata::{RawTableInfo, TableId};
|
||||
use table::table_name::TableName;
|
||||
use table::table_reference::TableReference;
|
||||
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error};
|
||||
use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{Result, TableNotFoundSnafu};
|
||||
use crate::key::table_info::TableInfoValue;
|
||||
@@ -66,7 +66,7 @@ impl Procedure for TruncateTableProcedure {
|
||||
self.on_datanode_truncate_regions().await
|
||||
}
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
.map_err(map_to_procedure_error)
|
||||
}
|
||||
|
||||
fn dump(&self) -> ProcedureResult<String> {
|
||||
|
||||
@@ -60,11 +60,16 @@ pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_retry_error(e: Error) -> ProcedureError {
|
||||
if e.is_retry_later() {
|
||||
ProcedureError::retry_later(e)
|
||||
} else {
|
||||
ProcedureError::external(e)
|
||||
/// Maps the error to the corresponding procedure error.
|
||||
///
|
||||
/// This function determines whether the error should be retried and if poison cleanup is needed,
|
||||
/// then maps it to the appropriate procedure error variant.
|
||||
pub fn map_to_procedure_error(e: Error) -> ProcedureError {
|
||||
match (e.is_retry_later(), e.need_clean_poisons()) {
|
||||
(true, true) => ProcedureError::retry_later_and_clean_poisons(e),
|
||||
(true, false) => ProcedureError::retry_later(e),
|
||||
(false, true) => ProcedureError::external_and_clean_poisons(e),
|
||||
(false, false) => ProcedureError::external(e),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -454,7 +454,10 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display("Retry later"))]
|
||||
RetryLater { source: BoxedError },
|
||||
RetryLater {
|
||||
source: BoxedError,
|
||||
clean_poisons: bool,
|
||||
},
|
||||
|
||||
#[snafu(display("Abort procedure"))]
|
||||
AbortProcedure {
|
||||
@@ -1039,6 +1042,7 @@ impl Error {
|
||||
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
|
||||
Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
clean_poisons: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1049,7 +1053,13 @@ impl Error {
|
||||
|
||||
/// Determine whether it needs to clean poisons.
|
||||
pub fn need_clean_poisons(&self) -> bool {
|
||||
matches!(self, Error::AbortProcedure { clean_poisons, .. } if *clean_poisons)
|
||||
matches!(
|
||||
self,
|
||||
Error::AbortProcedure { clean_poisons, .. } if *clean_poisons
|
||||
) || matches!(
|
||||
self,
|
||||
Error::RetryLater { clean_poisons, .. } if *clean_poisons
|
||||
)
|
||||
}
|
||||
|
||||
/// Returns true if the response exceeds the size limit.
|
||||
|
||||
@@ -138,7 +138,10 @@ pub enum Error {
|
||||
},
|
||||
|
||||
#[snafu(display("Procedure exec failed"))]
|
||||
RetryLater { source: BoxedError },
|
||||
RetryLater {
|
||||
source: BoxedError,
|
||||
clean_poisons: bool,
|
||||
},
|
||||
|
||||
#[snafu(display("Procedure panics, procedure_id: {}", procedure_id))]
|
||||
ProcedurePanic { procedure_id: ProcedureId },
|
||||
@@ -298,6 +301,15 @@ impl Error {
|
||||
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
|
||||
Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
clean_poisons: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new [Error::RetryLater] error from source `err` and clean poisons.
|
||||
pub fn retry_later_and_clean_poisons<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
|
||||
Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
clean_poisons: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,6 +321,7 @@ impl Error {
|
||||
/// Determine whether it needs to clean poisons.
|
||||
pub fn need_clean_poisons(&self) -> bool {
|
||||
matches!(self, Error::External { clean_poisons, .. } if *clean_poisons)
|
||||
|| matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons)
|
||||
}
|
||||
|
||||
/// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according
|
||||
|
||||
@@ -358,10 +358,11 @@ impl Runner {
|
||||
Err(e) => {
|
||||
error!(
|
||||
e;
|
||||
"Failed to execute procedure {}-{}, retry: {}",
|
||||
"Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}",
|
||||
self.procedure.type_name(),
|
||||
self.meta.id,
|
||||
e.is_retry_later(),
|
||||
e.need_clean_poisons(),
|
||||
);
|
||||
|
||||
// Don't store state if `ProcedureManager` is stopped.
|
||||
@@ -378,6 +379,11 @@ impl Runner {
|
||||
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
|
||||
return;
|
||||
}
|
||||
debug!(
|
||||
"Procedure {}-{} cleaned poisons",
|
||||
self.procedure.type_name(),
|
||||
self.meta.id,
|
||||
);
|
||||
}
|
||||
|
||||
if e.is_retry_later() {
|
||||
@@ -581,6 +587,7 @@ impl Runner {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -1598,6 +1605,75 @@ mod tests {
|
||||
assert_eq!(&procedure_id.to_string(), ROOT_ID);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute_exceed_max_retry_after_set_poison() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let mut times = 0;
|
||||
let poison_key = PoisonKey::new("table/1024");
|
||||
let moved_poison_key = poison_key.clone();
|
||||
let exec_fn = move |ctx: Context| {
|
||||
times += 1;
|
||||
let poison_key = moved_poison_key.clone();
|
||||
async move {
|
||||
if times == 1 {
|
||||
Ok(Status::executing(true))
|
||||
} else {
|
||||
// Put the poison to the context.
|
||||
ctx.provider
|
||||
.try_put_poison(&poison_key, ctx.procedure_id)
|
||||
.await
|
||||
.unwrap();
|
||||
Err(Error::retry_later_and_clean_poisons(MockError::new(
|
||||
StatusCode::Unexpected,
|
||||
)))
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
};
|
||||
let poison = ProcedureAdapter {
|
||||
data: "poison".to_string(),
|
||||
lock_key: LockKey::single_exclusive("catalog.schema.table"),
|
||||
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
|
||||
exec_fn,
|
||||
rollback_fn: None,
|
||||
};
|
||||
|
||||
let dir = create_temp_dir("exceed_max_after_set_poison");
|
||||
let meta = poison.new_meta(ROOT_ID);
|
||||
let object_store = test_util::new_object_store(&dir);
|
||||
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
|
||||
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store);
|
||||
runner.manager_ctx.start();
|
||||
runner.exponential_builder = ExponentialBuilder::default()
|
||||
.with_min_delay(Duration::from_millis(1))
|
||||
.with_max_times(3);
|
||||
// Use the manager ctx as the context provider.
|
||||
let ctx = context_with_provider(
|
||||
meta.id,
|
||||
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
|
||||
);
|
||||
// Manually add this procedure to the manager ctx.
|
||||
runner
|
||||
.manager_ctx
|
||||
.procedures
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(meta.id, runner.meta.clone());
|
||||
// Run the runner and execute the procedure.
|
||||
runner.execute_once_with_retry(&ctx).await;
|
||||
let err = meta.state().error().unwrap().clone();
|
||||
assert_matches!(&*err, Error::RetryTimesExceeded { .. });
|
||||
|
||||
// Check the poison is deleted.
|
||||
let procedure_id = runner
|
||||
.manager_ctx
|
||||
.poison_manager
|
||||
.get_poison(&poison_key.to_string())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(procedure_id, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_execute_poisoned() {
|
||||
let mut times = 0;
|
||||
|
||||
Reference in New Issue
Block a user