diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 1c50c08625..e2016deab5 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -55,6 +55,7 @@ impl Datanode for RegionRequester { if err.should_retry() { meta_error::Error::RetryLater { source: BoxedError::new(err), + clean_poisons: false, } } else { meta_error::Error::External { diff --git a/src/common/meta/src/ddl/alter_database.rs b/src/common/meta/src/ddl/alter_database.rs index 57987da4f4..077f31126a 100644 --- a/src/common/meta/src/ddl/alter_database.rs +++ b/src/common/meta/src/ddl/alter_database.rs @@ -21,7 +21,7 @@ use snafu::{ensure, ResultExt}; use strum::AsRefStr; use crate::cache_invalidator::Context; -use crate::ddl::utils::handle_retry_error; +use crate::ddl::utils::map_to_procedure_error; use crate::ddl::DdlContext; use crate::error::{Result, SchemaNotFoundSnafu}; use crate::instruction::CacheIdent; @@ -148,7 +148,7 @@ impl Procedure for AlterDatabaseProcedure { AlterDatabaseState::UpdateMetadata => self.on_update_metadata().await, AlterDatabaseState::InvalidateSchemaCache => self.on_invalidate_schema_cache().await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/alter_logical_tables.rs b/src/common/meta/src/ddl/alter_logical_tables.rs index 74706a8ddb..37dc76c4ed 100644 --- a/src/common/meta/src/ddl/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/alter_logical_tables.rs @@ -32,9 +32,11 @@ use store_api::metric_engine_consts::ALTER_PHYSICAL_EXTENSION_KEY; use strum::AsRefStr; use table::metadata::TableId; -use crate::ddl::utils::{add_peer_context_if_needed, sync_follower_regions}; +use crate::ddl::utils::{ + add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions, +}; use crate::ddl::DdlContext; -use crate::error::{DecodeJsonSnafu, Error, MetadataCorruptionSnafu, Result}; +use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; use crate::key::table_info::TableInfoValue; use crate::key::table_route::PhysicalTableRouteValue; use crate::key::DeserializedValueWithBytes; @@ -217,14 +219,6 @@ impl Procedure for AlterLogicalTablesProcedure { } async fn execute(&mut self, _ctx: &Context) -> ProcedureResult { - let error_handler = |e: Error| { - if e.is_retry_later() { - common_procedure::Error::retry_later(e) - } else { - common_procedure::Error::external(e) - } - }; - let state = &self.data.state; let step = state.as_ref(); @@ -241,7 +235,7 @@ impl Procedure for AlterLogicalTablesProcedure { AlterTablesState::UpdateMetadata => self.on_update_metadata().await, AlterTablesState::InvalidateTableCache => self.on_invalidate_table_cache().await, } - .map_err(error_handler) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index ee7b15509c..bfa0a679ea 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -40,10 +40,11 @@ use table::table_reference::TableReference; use crate::cache_invalidator::Context; use crate::ddl::utils::{ - add_peer_context_if_needed, handle_multiple_results, sync_follower_regions, MultipleResults, + add_peer_context_if_needed, handle_multiple_results, map_to_procedure_error, + sync_follower_regions, MultipleResults, }; use crate::ddl::DdlContext; -use crate::error::{AbortProcedureSnafu, Error, NoLeaderSnafu, PutPoisonSnafu, Result}; +use crate::error::{AbortProcedureSnafu, NoLeaderSnafu, PutPoisonSnafu, Result, RetryLaterSnafu}; use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::{DeserializedValueWithBytes, RegionDistribution}; @@ -195,7 +196,10 @@ impl AlterTableProcedure { } MultipleResults::AllRetryable(error) => { // Just returns the error, and wait for the next try. - Err(error) + let err = BoxedError::new(error); + Err(err).context(RetryLaterSnafu { + clean_poisons: true, + }) } MultipleResults::Ok(results) => { self.submit_sync_region_requests(results, &physical_table_route.region_routes) @@ -323,16 +327,6 @@ impl Procedure for AlterTableProcedure { } async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { - let error_handler = |e: Error| { - if e.is_retry_later() { - ProcedureError::retry_later(e) - } else if e.need_clean_poisons() { - ProcedureError::external_and_clean_poisons(e) - } else { - ProcedureError::external(e) - } - }; - let state = &self.data.state; let step = state.as_ref(); @@ -350,7 +344,7 @@ impl Procedure for AlterTableProcedure { AlterTableState::UpdateMetadata => self.on_update_metadata().await, AlterTableState::InvalidateTableCache => self.on_broadcast().await, } - .map_err(error_handler) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/create_database.rs b/src/common/meta/src/ddl/create_database.rs index 59d88f0744..320f1ecbcf 100644 --- a/src/common/meta/src/ddl/create_database.rs +++ b/src/common/meta/src/ddl/create_database.rs @@ -22,7 +22,7 @@ use serde_with::{serde_as, DefaultOnNull}; use snafu::{ensure, ResultExt}; use strum::AsRefStr; -use crate::ddl::utils::handle_retry_error; +use crate::ddl::utils::map_to_procedure_error; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::schema_name::{SchemaNameKey, SchemaNameValue}; @@ -115,7 +115,7 @@ impl Procedure for CreateDatabaseProcedure { CreateDatabaseState::Prepare => self.on_prepare().await, CreateDatabaseState::CreateMetadata => self.on_create_metadata().await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 191bc59259..647f78c303 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -36,7 +36,7 @@ use strum::AsRefStr; use table::metadata::TableId; use crate::cache_invalidator::Context; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; +use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error}; use crate::ddl::DdlContext; use crate::error::{self, Result, UnexpectedSnafu}; use crate::instruction::{CacheIdent, CreateFlow, DropFlow}; @@ -304,7 +304,7 @@ impl Procedure for CreateFlowProcedure { CreateFlowState::CreateMetadata => self.on_create_metadata().await, CreateFlowState::InvalidateFlowCache => self.on_broadcast().await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/create_logical_tables.rs b/src/common/meta/src/ddl/create_logical_tables.rs index 628f17d398..06bc247f89 100644 --- a/src/common/meta/src/ddl/create_logical_tables.rs +++ b/src/common/meta/src/ddl/create_logical_tables.rs @@ -33,7 +33,9 @@ use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::metadata::{RawTableInfo, TableId}; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, sync_follower_regions}; +use crate::ddl::utils::{ + add_peer_context_if_needed, map_to_procedure_error, sync_follower_regions, +}; use crate::ddl::DdlContext; use crate::error::{DecodeJsonSnafu, MetadataCorruptionSnafu, Result}; use crate::key::table_route::TableRouteValue; @@ -238,7 +240,7 @@ impl Procedure for CreateLogicalTablesProcedure { CreateTablesState::DatanodeCreateRegions => self.on_datanode_create_regions().await, CreateTablesState::CreateMetadata => self.on_create_metadata().await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index bbd169658b..ee0dfd6c18 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -34,7 +34,7 @@ use table::table_reference::TableReference; use crate::ddl::create_table_template::{build_template, CreateRequestBuilder}; use crate::ddl::utils::{ - add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error, + add_peer_context_if_needed, convert_region_routes_to_detecting_regions, map_to_procedure_error, region_storage_path, }; use crate::ddl::{DdlContext, TableMetadata}; @@ -319,7 +319,7 @@ impl Procedure for CreateTableProcedure { CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await, CreateTableState::CreateMetadata => self.on_create_metadata().await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/create_view.rs b/src/common/meta/src/ddl/create_view.rs index 349d2a84cc..f89aae8a3c 100644 --- a/src/common/meta/src/ddl/create_view.rs +++ b/src/common/meta/src/ddl/create_view.rs @@ -23,7 +23,7 @@ use table::metadata::{RawTableInfo, TableId, TableType}; use table::table_reference::TableReference; use crate::cache_invalidator::Context; -use crate::ddl::utils::handle_retry_error; +use crate::ddl::utils::map_to_procedure_error; use crate::ddl::{DdlContext, TableMetadata}; use crate::error::{self, Result}; use crate::instruction::CacheIdent; @@ -249,7 +249,7 @@ impl Procedure for CreateViewProcedure { CreateViewState::Prepare => self.on_prepare().await, CreateViewState::CreateMetadata => self.on_create_metadata(ctx).await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/drop_database.rs b/src/common/meta/src/ddl/drop_database.rs index 4aa4a1e362..55af78de30 100644 --- a/src/common/meta/src/ddl/drop_database.rs +++ b/src/common/meta/src/ddl/drop_database.rs @@ -21,7 +21,7 @@ use std::any::Any; use std::fmt::Debug; use common_error::ext::BoxedError; -use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu}; +use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, }; @@ -31,6 +31,7 @@ use snafu::ResultExt; use tonic::async_trait; use self::start::DropDatabaseStart; +use crate::ddl::utils::map_to_procedure_error; use crate::ddl::DdlContext; use crate::error::Result; use crate::key::table_name::TableNameValue; @@ -141,13 +142,7 @@ impl Procedure for DropDatabaseProcedure { let (next, status) = state .next(&self.runtime_context, &mut self.context) .await - .map_err(|e| { - if e.is_retry_later() { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) - } - })?; + .map_err(map_to_procedure_error)?; *state = next; Ok(status) diff --git a/src/common/meta/src/ddl/drop_database/executor.rs b/src/common/meta/src/ddl/drop_database/executor.rs index 21b185c189..216962a87a 100644 --- a/src/common/meta/src/ddl/drop_database/executor.rs +++ b/src/common/meta/src/ddl/drop_database/executor.rs @@ -323,6 +323,7 @@ mod tests { } .build(), ), + clean_poisons: false, }) } diff --git a/src/common/meta/src/ddl/drop_flow.rs b/src/common/meta/src/ddl/drop_flow.rs index b5058754ec..7cfb2747f2 100644 --- a/src/common/meta/src/ddl/drop_flow.rs +++ b/src/common/meta/src/ddl/drop_flow.rs @@ -30,7 +30,7 @@ use snafu::{ensure, ResultExt}; use strum::AsRefStr; use crate::cache_invalidator::Context; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; +use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error}; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::flow_name::FlowName; @@ -201,7 +201,7 @@ impl Procedure for DropFlowProcedure { DropFlowState::InvalidateFlowCache => self.on_broadcast().await, DropFlowState::DropFlows => self.on_flownode_drop_flows().await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/drop_table.rs b/src/common/meta/src/ddl/drop_table.rs index d661744ce3..65ba9cf086 100644 --- a/src/common/meta/src/ddl/drop_table.rs +++ b/src/common/meta/src/ddl/drop_table.rs @@ -35,7 +35,7 @@ use table::metadata::TableId; use table::table_reference::TableReference; use self::executor::DropTableExecutor; -use crate::ddl::utils::handle_retry_error; +use crate::ddl::utils::map_to_procedure_error; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::key::table_route::TableRouteValue; @@ -221,7 +221,7 @@ impl Procedure for DropTableProcedure { DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await, DropTableState::DeleteTombstone => self.on_delete_metadata_tombstone().await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/drop_view.rs b/src/common/meta/src/ddl/drop_view.rs index e0b3f8b561..2f0907ed37 100644 --- a/src/common/meta/src/ddl/drop_view.rs +++ b/src/common/meta/src/ddl/drop_view.rs @@ -25,7 +25,7 @@ use table::metadata::{RawTableInfo, TableId, TableType}; use table::table_reference::TableReference; use crate::cache_invalidator::Context; -use crate::ddl::utils::handle_retry_error; +use crate::ddl::utils::map_to_procedure_error; use crate::ddl::DdlContext; use crate::error::{self, Result}; use crate::instruction::CacheIdent; @@ -191,7 +191,7 @@ impl Procedure for DropViewProcedure { DropViewState::DeleteMetadata => self.on_delete_metadata().await, DropViewState::InvalidateViewCache => self.on_broadcast().await, } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/test_util/datanode_handler.rs b/src/common/meta/src/ddl/test_util/datanode_handler.rs index bed78724a5..775fc644f7 100644 --- a/src/common/meta/src/ddl/test_util/datanode_handler.rs +++ b/src/common/meta/src/ddl/test_util/datanode_handler.rs @@ -105,6 +105,7 @@ impl MockDatanodeHandler for RetryErrorDatanodeHandler { } .build(), ), + clean_poisons: false, }) } @@ -218,6 +219,7 @@ impl MockDatanodeHandler for PartialSuccessDatanodeHandler { } .build(), ), + clean_poisons: false, }) } else { error::UnexpectedSnafu { @@ -252,6 +254,7 @@ impl MockDatanodeHandler for AllFailureDatanodeHandler { } .build(), ), + clean_poisons: false, }) } else { error::UnexpectedSnafu { diff --git a/src/common/meta/src/ddl/tests/alter_table.rs b/src/common/meta/src/ddl/tests/alter_table.rs index 6f24870e6e..26e07117c8 100644 --- a/src/common/meta/src/ddl/tests/alter_table.rs +++ b/src/common/meta/src/ddl/tests/alter_table.rs @@ -575,6 +575,7 @@ async fn test_on_submit_alter_request_with_partial_success_retryable() { .await .unwrap_err(); assert!(result.is_retry_later()); + assert!(!result.need_clean_poisons()); // Submits again let result = procedure @@ -582,6 +583,7 @@ async fn test_on_submit_alter_request_with_partial_success_retryable() { .await .unwrap_err(); assert!(result.is_retry_later()); + assert!(!result.need_clean_poisons()); } #[tokio::test] @@ -618,12 +620,14 @@ async fn test_on_submit_alter_request_with_all_failure_retrybale() { .await .unwrap_err(); assert!(err.is_retry_later()); + assert!(err.need_clean_poisons()); // submits again let err = procedure .submit_alter_region_requests(procedure_id, provider.as_ref()) .await .unwrap_err(); assert!(err.is_retry_later()); + assert!(err.need_clean_poisons()); } #[tokio::test] diff --git a/src/common/meta/src/ddl/truncate_table.rs b/src/common/meta/src/ddl/truncate_table.rs index 531dd36fb7..1739cdb153 100644 --- a/src/common/meta/src/ddl/truncate_table.rs +++ b/src/common/meta/src/ddl/truncate_table.rs @@ -31,7 +31,7 @@ use table::metadata::{RawTableInfo, TableId}; use table::table_name::TableName; use table::table_reference::TableReference; -use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error}; +use crate::ddl::utils::{add_peer_context_if_needed, map_to_procedure_error}; use crate::ddl::DdlContext; use crate::error::{Result, TableNotFoundSnafu}; use crate::key::table_info::TableInfoValue; @@ -66,7 +66,7 @@ impl Procedure for TruncateTableProcedure { self.on_datanode_truncate_regions().await } } - .map_err(handle_retry_error) + .map_err(map_to_procedure_error) } fn dump(&self) -> ProcedureResult { diff --git a/src/common/meta/src/ddl/utils.rs b/src/common/meta/src/ddl/utils.rs index 8c48d9f419..4ee165541f 100644 --- a/src/common/meta/src/ddl/utils.rs +++ b/src/common/meta/src/ddl/utils.rs @@ -60,11 +60,16 @@ pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error } } -pub fn handle_retry_error(e: Error) -> ProcedureError { - if e.is_retry_later() { - ProcedureError::retry_later(e) - } else { - ProcedureError::external(e) +/// Maps the error to the corresponding procedure error. +/// +/// This function determines whether the error should be retried and if poison cleanup is needed, +/// then maps it to the appropriate procedure error variant. +pub fn map_to_procedure_error(e: Error) -> ProcedureError { + match (e.is_retry_later(), e.need_clean_poisons()) { + (true, true) => ProcedureError::retry_later_and_clean_poisons(e), + (true, false) => ProcedureError::retry_later(e), + (false, true) => ProcedureError::external_and_clean_poisons(e), + (false, false) => ProcedureError::external(e), } } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index d646299f1e..aa5cdc43e8 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -454,7 +454,10 @@ pub enum Error { }, #[snafu(display("Retry later"))] - RetryLater { source: BoxedError }, + RetryLater { + source: BoxedError, + clean_poisons: bool, + }, #[snafu(display("Abort procedure"))] AbortProcedure { @@ -1039,6 +1042,7 @@ impl Error { pub fn retry_later(err: E) -> Error { Error::RetryLater { source: BoxedError::new(err), + clean_poisons: false, } } @@ -1049,7 +1053,13 @@ impl Error { /// Determine whether it needs to clean poisons. pub fn need_clean_poisons(&self) -> bool { - matches!(self, Error::AbortProcedure { clean_poisons, .. } if *clean_poisons) + matches!( + self, + Error::AbortProcedure { clean_poisons, .. } if *clean_poisons + ) || matches!( + self, + Error::RetryLater { clean_poisons, .. } if *clean_poisons + ) } /// Returns true if the response exceeds the size limit. diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index 109b878d70..90bf9dc5f6 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -138,7 +138,10 @@ pub enum Error { }, #[snafu(display("Procedure exec failed"))] - RetryLater { source: BoxedError }, + RetryLater { + source: BoxedError, + clean_poisons: bool, + }, #[snafu(display("Procedure panics, procedure_id: {}", procedure_id))] ProcedurePanic { procedure_id: ProcedureId }, @@ -298,6 +301,15 @@ impl Error { pub fn retry_later(err: E) -> Error { Error::RetryLater { source: BoxedError::new(err), + clean_poisons: false, + } + } + + /// Creates a new [Error::RetryLater] error from source `err` and clean poisons. + pub fn retry_later_and_clean_poisons(err: E) -> Error { + Error::RetryLater { + source: BoxedError::new(err), + clean_poisons: true, } } @@ -309,6 +321,7 @@ impl Error { /// Determine whether it needs to clean poisons. pub fn need_clean_poisons(&self) -> bool { matches!(self, Error::External { clean_poisons, .. } if *clean_poisons) + || matches!(self, Error::RetryLater { clean_poisons, .. } if *clean_poisons) } /// Creates a new [Error::RetryLater] or [Error::External] error from source `err` according diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index c2df1a9466..677fb33745 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -358,10 +358,11 @@ impl Runner { Err(e) => { error!( e; - "Failed to execute procedure {}-{}, retry: {}", + "Failed to execute procedure {}-{}, retry: {}, clean_poisons: {}", self.procedure.type_name(), self.meta.id, e.is_retry_later(), + e.need_clean_poisons(), ); // Don't store state if `ProcedureManager` is stopped. @@ -378,6 +379,11 @@ impl Runner { self.meta.set_state(ProcedureState::retrying(Arc::new(e))); return; } + debug!( + "Procedure {}-{} cleaned poisons", + self.procedure.type_name(), + self.meta.id, + ); } if e.is_retry_later() { @@ -581,6 +587,7 @@ impl Runner { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; @@ -1598,6 +1605,75 @@ mod tests { assert_eq!(&procedure_id.to_string(), ROOT_ID); } + #[tokio::test] + async fn test_execute_exceed_max_retry_after_set_poison() { + common_telemetry::init_default_ut_logging(); + let mut times = 0; + let poison_key = PoisonKey::new("table/1024"); + let moved_poison_key = poison_key.clone(); + let exec_fn = move |ctx: Context| { + times += 1; + let poison_key = moved_poison_key.clone(); + async move { + if times == 1 { + Ok(Status::executing(true)) + } else { + // Put the poison to the context. + ctx.provider + .try_put_poison(&poison_key, ctx.procedure_id) + .await + .unwrap(); + Err(Error::retry_later_and_clean_poisons(MockError::new( + StatusCode::Unexpected, + ))) + } + } + .boxed() + }; + let poison = ProcedureAdapter { + data: "poison".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::new(vec![poison_key.clone()]), + exec_fn, + rollback_fn: None, + }; + + let dir = create_temp_dir("exceed_max_after_set_poison"); + let meta = poison.new_meta(ROOT_ID); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store); + runner.manager_ctx.start(); + runner.exponential_builder = ExponentialBuilder::default() + .with_min_delay(Duration::from_millis(1)) + .with_max_times(3); + // Use the manager ctx as the context provider. + let ctx = context_with_provider( + meta.id, + runner.manager_ctx.clone() as Arc, + ); + // Manually add this procedure to the manager ctx. + runner + .manager_ctx + .procedures + .write() + .unwrap() + .insert(meta.id, runner.meta.clone()); + // Run the runner and execute the procedure. + runner.execute_once_with_retry(&ctx).await; + let err = meta.state().error().unwrap().clone(); + assert_matches!(&*err, Error::RetryTimesExceeded { .. }); + + // Check the poison is deleted. + let procedure_id = runner + .manager_ctx + .poison_manager + .get_poison(&poison_key.to_string()) + .await + .unwrap(); + assert_eq!(procedure_id, None); + } + #[tokio::test] async fn test_execute_poisoned() { let mut times = 0;