From 917510ffd0828d8341e4bd33d552edaece73f371 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 7 Apr 2025 16:25:13 +0800 Subject: [PATCH] feat: introduce poison mechanism for procedure (#5822) * feat: introduce poison for procedure * tests: add unit tests * refactor: minor refactor * fix: unit tests * chore: fix unit tests * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: update comments * chore: introduce `ProcedureStatus::Poisoned` * chore: upgrade greptime-proto to `2be0f` * chore: apply suggestions from CR --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/common/meta/src/ddl/test_util.rs | 8 +- .../src/ddl/tests/alter_logical_tables.rs | 80 +++- .../src/ddl/tests/create_logical_tables.rs | 40 +- src/common/meta/src/ddl/tests/create_table.rs | 8 +- src/common/meta/src/ddl/tests/create_view.rs | 8 +- src/common/meta/src/ddl_manager.rs | 8 +- src/common/meta/src/error.rs | 14 +- src/common/meta/src/key.rs | 4 +- src/common/meta/src/rpc/procedure.rs | 1 + src/common/meta/src/state_store.rs | 250 ++++++++++++- src/common/procedure-test/Cargo.toml | 3 +- src/common/procedure-test/src/lib.rs | 21 +- src/common/procedure/src/error.rs | 53 ++- src/common/procedure/src/lib.rs | 7 +- src/common/procedure/src/local.rs | 112 ++++-- src/common/procedure/src/local/runner.rs | 349 +++++++++++++++++- src/common/procedure/src/procedure.rs | 124 ++++++- src/common/procedure/src/store.rs | 6 + .../procedure/src/store/poison_store.rs | 59 +++ src/common/procedure/src/test_util.rs | 85 +++++ src/common/procedure/src/watcher.rs | 13 +- src/frontend/src/instance.rs | 8 +- src/meta-srv/src/metasrv/builder.rs | 19 +- .../procedure/region_migration/test_util.rs | 8 +- src/meta-srv/src/procedure/tests.rs | 16 +- src/meta-srv/src/procedure/wal_prune.rs | 8 +- 28 files changed, 1226 insertions(+), 91 deletions(-) create mode 100644 src/common/procedure/src/store/poison_store.rs create mode 100644 src/common/procedure/src/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index 344edf2c9d..d8b80ca363 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2268,6 +2268,7 @@ version = "0.14.0" dependencies = [ "async-trait", "common-procedure", + "snafu 0.8.5", ] [[package]] @@ -4690,7 +4691,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fb8e20ce29afd81835e3ea3c1164c8ce10de2c65#fb8e20ce29afd81835e3ea3c1164c8ce10de2c65" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2be0f36b3264e28ab0e1c22a980d0bb634eb3a77#2be0f36b3264e28ab0e1c22a980d0bb634eb3a77" dependencies = [ "prost 0.13.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index 262b5fe221..14914f4d90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fb8e20ce29afd81835e3ea3c1164c8ce10de2c65" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2be0f36b3264e28ab0e1c22a980d0bb634eb3a77" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/meta/src/ddl/test_util.rs b/src/common/meta/src/ddl/test_util.rs index 4d6a6c63b5..6707306f3b 100644 --- a/src/common/meta/src/ddl/test_util.rs +++ b/src/common/meta/src/ddl/test_util.rs @@ -80,7 +80,13 @@ pub async fn create_logical_table( let tasks = vec![test_create_logical_table_task(table_name)]; let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context); let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); let status = procedure.on_create_metadata().await.unwrap(); assert_matches!(status, Status::Done { .. }); diff --git a/src/common/meta/src/ddl/tests/alter_logical_tables.rs b/src/common/meta/src/ddl/tests/alter_logical_tables.rs index 4fa9992a17..1c22cdf6f4 100644 --- a/src/common/meta/src/ddl/tests/alter_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/alter_logical_tables.rs @@ -180,7 +180,13 @@ async fn test_on_prepare() { let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context); let result = procedure.on_prepare().await; - assert_matches!(result, Ok(Status::Executing { persist: true })); + assert_matches!( + result, + Ok(Status::Executing { + persist: true, + clean_poisons: false + }) + ); } #[tokio::test] @@ -205,7 +211,13 @@ async fn test_on_update_metadata() { let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context); let mut status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); let ctx = common_procedure::Context { procedure_id: ProcedureId::random(), @@ -213,10 +225,22 @@ async fn test_on_update_metadata() { }; // on_submit_alter_region_requests status = procedure.execute(&ctx).await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); // on_update_metadata status = procedure.execute(&ctx).await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); } #[tokio::test] @@ -237,7 +261,13 @@ async fn test_on_part_duplicate_alter_request() { let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context.clone()); let mut status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); let ctx = common_procedure::Context { procedure_id: ProcedureId::random(), @@ -245,10 +275,22 @@ async fn test_on_part_duplicate_alter_request() { }; // on_submit_alter_region_requests status = procedure.execute(&ctx).await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); // on_update_metadata status = procedure.execute(&ctx).await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); // re-alter let tasks = vec![ @@ -270,7 +312,13 @@ async fn test_on_part_duplicate_alter_request() { let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context.clone()); let mut status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); let ctx = common_procedure::Context { procedure_id: ProcedureId::random(), @@ -278,10 +326,22 @@ async fn test_on_part_duplicate_alter_request() { }; // on_submit_alter_region_requests status = procedure.execute(&ctx).await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); // on_update_metadata status = procedure.execute(&ctx).await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); let table_name_keys = vec![ TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table1"), diff --git a/src/common/meta/src/ddl/tests/create_logical_tables.rs b/src/common/meta/src/ddl/tests/create_logical_tables.rs index a331b32bb0..fb5518d463 100644 --- a/src/common/meta/src/ddl/tests/create_logical_tables.rs +++ b/src/common/meta/src/ddl/tests/create_logical_tables.rs @@ -69,7 +69,13 @@ async fn test_on_prepare() { let physical_table_id = table_id; let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context); let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); } #[tokio::test] @@ -202,7 +208,13 @@ async fn test_on_prepare_part_logical_tables_exist() { ddl_context, ); let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); } #[tokio::test] @@ -238,7 +250,13 @@ async fn test_on_create_metadata() { ddl_context, ); let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), provider: Arc::new(MockContextProvider::default()), @@ -294,7 +312,13 @@ async fn test_on_create_metadata_part_logical_tables_exist() { ddl_context, ); let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), provider: Arc::new(MockContextProvider::default()), @@ -339,7 +363,13 @@ async fn test_on_create_metadata_err() { ddl_context.clone(), ); let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); let ctx = ProcedureContext { procedure_id: ProcedureId::random(), provider: Arc::new(MockContextProvider::default()), diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index e62329c780..c4cae2233d 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -137,7 +137,13 @@ async fn test_on_prepare_without_create_if_table_exists() { task.create_table.create_if_not_exists = true; let mut procedure = CreateTableProcedure::new(task, ddl_context); let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); assert_eq!(procedure.table_id(), 1024); } diff --git a/src/common/meta/src/ddl/tests/create_view.rs b/src/common/meta/src/ddl/tests/create_view.rs index 3f833333d2..5680fce71b 100644 --- a/src/common/meta/src/ddl/tests/create_view.rs +++ b/src/common/meta/src/ddl/tests/create_view.rs @@ -153,7 +153,13 @@ async fn test_on_prepare_without_create_if_table_exists() { task.create_view.create_if_not_exists = true; let mut procedure = CreateViewProcedure::new(task, ddl_context); let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); assert_eq!(procedure.view_id(), 1024); } diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index f3f54a0987..c943c70bc9 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -834,6 +834,7 @@ mod tests { use std::sync::Arc; use common_procedure::local::LocalManager; + use common_procedure::test_util::InMemoryPoisonStore; use super::DdlManager; use crate::cache_invalidator::DummyCacheInvalidator; @@ -883,7 +884,12 @@ mod tests { )); let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); - let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store)); + let poison_manager = Arc::new(InMemoryPoisonStore::default()); + let procedure_manager = Arc::new(LocalManager::new( + Default::default(), + state_store, + poison_manager, + )); let _ = DdlManager::try_new( DdlContext { diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index e83aefbe33..56ef212cec 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -748,6 +748,18 @@ pub enum Error { #[snafu(source)] error: serde_json::Error, }, + + #[snafu(display( + "Procedure poison key already exists with a different value, key: {}, value: {}", + key, + value + ))] + ProcedurePoisonConflict { + key: String, + value: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -766,7 +778,7 @@ impl ErrorExt for Error { | SerializeToJson { .. } | DeserializeFromJson { .. } => StatusCode::Internal, - ValueNotExist { .. } => StatusCode::Unexpected, + ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected, Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index ada1eb42a9..d89236b564 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -156,6 +156,7 @@ use crate::kv_backend::txn::{Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::rpc::router::{region_distribution, LeaderState, RegionRoute}; use crate::rpc::store::BatchDeleteRequest; +use crate::state_store::PoisonValue; use crate::DatanodeId; pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*"; @@ -1324,7 +1325,8 @@ impl_metadata_value! { TableFlowValue, NodeAddressValue, SchemaNameValue, - FlowStateValue + FlowStateValue, + PoisonValue } impl_optional_metadata_value! { diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index 7dd51593d1..c8a43ffcb2 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -81,6 +81,7 @@ pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus ProcedureState::RollingBack { error } => { (PbProcedureStatus::RollingBack, error.to_string()) } + ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.to_string()), } } diff --git a/src/common/meta/src/state_store.rs b/src/common/meta/src/state_store.rs index c40e1a8cab..6649eae065 100644 --- a/src/common/meta/src/state_store.rs +++ b/src/common/meta/src/state_store.rs @@ -14,16 +14,23 @@ use async_trait::async_trait; use common_error::ext::BoxedError; -use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu}; +use common_procedure::error::{ + DeletePoisonSnafu, DeleteStatesSnafu, GetPoisonSnafu, ListStateSnafu, PutPoisonSnafu, + PutStateSnafu, Result as ProcedureResult, +}; +use common_procedure::store::poison_store::PoisonStore; use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore}; use common_procedure::store::util::multiple_value_stream; -use common_procedure::Result as ProcedureResult; use futures::future::try_join_all; use futures::StreamExt; use itertools::Itertools; -use snafu::ResultExt; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::Result; +use crate::error::{ProcedurePoisonConflictSnafu, Result, UnexpectedSnafu}; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::key::{DeserializedValueWithBytes, MetadataValue}; +use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp}; use crate::kv_backend::KvBackendRef; use crate::range_stream::PaginationStream; use crate::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest}; @@ -32,11 +39,16 @@ use crate::rpc::KeyValue; const DELIMITER: &str = "/"; const PROCEDURE_PREFIX: &str = "/__procedure__/"; +const PROCEDURE_POISON_KEY_PREFIX: &str = "/__procedure_poison/"; fn with_prefix(key: &str) -> String { format!("{PROCEDURE_PREFIX}{key}") } +fn with_poison_prefix(key: &str) -> String { + format!("{}{}", PROCEDURE_POISON_KEY_PREFIX, key) +} + fn strip_prefix(key: &str) -> String { key.trim_start_matches(PROCEDURE_PREFIX).to_string() } @@ -207,8 +219,168 @@ impl StateStore for KvStateStore { } } +/// The value of the poison key. +/// +/// Each poison value contains a unique token that identifies the procedure. +/// While multiple procedures may use the same poison key (representing the same resource), +/// each procedure will have a distinct token value to differentiate its ownership. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PoisonValue { + token: String, +} + +type PoisonDecodeResult = Result>>; + +impl KvStateStore { + /// Builds a create poison transaction, + /// it expected the `__procedure_poison/{key}` wasn't occupied. + fn build_create_poison_txn( + &self, + key: &str, + value: &PoisonValue, + ) -> Result<( + Txn, + impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult, + )> { + let key = key.as_bytes().to_vec(); + let value = value.try_as_raw_value()?; + let txn = Txn::put_if_not_exists(key.clone(), value); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), + )) + } + + /// Builds a delete poison transaction, + /// it expected the `__procedure_poison/{key}` was occupied. + fn build_delete_poison_txn( + &self, + key: &str, + value: PoisonValue, + ) -> Result<( + Txn, + impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult, + )> { + let key = key.as_bytes().to_vec(); + let value = value.try_as_raw_value()?; + + let txn = Txn::new() + .when(vec![Compare::with_value( + key.clone(), + CompareOp::Equal, + value, + )]) + .and_then(vec![TxnOp::Delete(key.clone())]) + .or_else(vec![TxnOp::Get(key.clone())]); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)), + )) + } + + async fn get_poison_inner(&self, key: &str) -> Result> { + let key = with_poison_prefix(key); + let value = self.kv_backend.get(key.as_bytes()).await?; + value + .map(|v| PoisonValue::try_from_raw_value(&v.value)) + .transpose() + } + + /// Put the poison. + /// + /// If the poison is already put by other procedure, it will return an error. + async fn set_poison_inner(&self, key: &str, token: &str) -> Result<()> { + let key = with_poison_prefix(key); + let (txn, on_failure) = self.build_create_poison_txn( + &key, + &PoisonValue { + token: token.to_string(), + }, + )?; + + let mut resp = self.kv_backend.txn(txn).await?; + + if !resp.succeeded { + let mut set = TxnOpGetResponseSet::from(&mut resp.responses); + let remote_value = on_failure(&mut set)? + .context(UnexpectedSnafu { + err_msg: "Reads the empty poison value in comparing operation of the put consistency poison", + })? + .into_inner(); + + ensure!( + remote_value.token == token, + ProcedurePoisonConflictSnafu { + key: &key, + value: &remote_value.token, + } + ); + } + + Ok(()) + } + + /// Deletes the poison. + /// + /// If the poison is not put by the procedure, it will return an error. + async fn delete_poison_inner(&self, key: &str, token: &str) -> Result<()> { + let key = with_poison_prefix(key); + let (txn, on_failure) = self.build_delete_poison_txn( + &key, + PoisonValue { + token: token.to_string(), + }, + )?; + + let mut resp = self.kv_backend.txn(txn).await?; + + if !resp.succeeded { + let mut set = TxnOpGetResponseSet::from(&mut resp.responses); + let remote_value = on_failure(&mut set)?; + + ensure!( + remote_value.is_none(), + ProcedurePoisonConflictSnafu { + key: &key, + value: &remote_value.unwrap().into_inner().token, + } + ); + } + + Ok(()) + } +} + +#[async_trait] +impl PoisonStore for KvStateStore { + async fn try_put_poison(&self, key: String, token: String) -> ProcedureResult<()> { + self.set_poison_inner(&key, &token) + .await + .map_err(BoxedError::new) + .context(PutPoisonSnafu { key, token }) + } + + async fn delete_poison(&self, key: String, token: String) -> ProcedureResult<()> { + self.delete_poison_inner(&key, &token) + .await + .map_err(BoxedError::new) + .context(DeletePoisonSnafu { key, token }) + } + + async fn get_poison(&self, key: &str) -> ProcedureResult> { + self.get_poison_inner(key) + .await + .map(|v| v.map(|v| v.token)) + .map_err(BoxedError::new) + .context(GetPoisonSnafu { key }) + } +} + #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use std::env; use std::sync::Arc; @@ -219,6 +391,7 @@ mod tests { use uuid::Uuid; use super::*; + use crate::error::Error; use crate::kv_backend::chroot::ChrootKvBackend; use crate::kv_backend::etcd::EtcdStore; use crate::kv_backend::memory::MemoryKvBackend; @@ -397,4 +570,73 @@ mod tests { ) .await; } + + #[tokio::test] + async fn test_poison() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let poison_manager = KvStateStore::new(mem_kv.clone()); + + let key = "table/1"; + + let token = "expected_token"; + + poison_manager.set_poison_inner(key, token).await.unwrap(); + + // Put again, should be ok. + poison_manager.set_poison_inner(key, token).await.unwrap(); + + // Delete, should be ok. + poison_manager + .delete_poison_inner(key, token) + .await + .unwrap(); + + // Delete again, should be ok. + poison_manager + .delete_poison_inner(key, token) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_consistency_poison_failed() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let poison_manager = KvStateStore::new(mem_kv.clone()); + + let key = "table/1"; + + let token = "expected_token"; + let token2 = "expected_token2"; + + poison_manager.set_poison_inner(key, token).await.unwrap(); + + let err = poison_manager + .set_poison_inner(key, token2) + .await + .unwrap_err(); + assert_matches!(err, Error::ProcedurePoisonConflict { .. }); + + let err = poison_manager + .delete_poison_inner(key, token2) + .await + .unwrap_err(); + assert_matches!(err, Error::ProcedurePoisonConflict { .. }); + } + + #[test] + fn test_serialize_deserialize() { + let key = "table/1"; + let value = PoisonValue { + token: "expected_token".to_string(), + }; + + let serialized_key = with_poison_prefix(key).as_bytes().to_vec(); + let serialized_value = value.try_as_raw_value().unwrap(); + + let expected_key = "/__procedure_poison/table/1"; + let expected_value = r#"{"token":"expected_token"}"#; + + assert_eq!(expected_key.as_bytes(), serialized_key); + assert_eq!(expected_value.as_bytes(), serialized_value); + } } diff --git a/src/common/procedure-test/Cargo.toml b/src/common/procedure-test/Cargo.toml index 2e76747270..07c8436646 100644 --- a/src/common/procedure-test/Cargo.toml +++ b/src/common/procedure-test/Cargo.toml @@ -9,4 +9,5 @@ workspace = true [dependencies] async-trait.workspace = true -common-procedure.workspace = true +common-procedure = { workspace = true, features = ["testing"] } +snafu.workspace = true diff --git a/src/common/procedure-test/src/lib.rs b/src/common/procedure-test/src/lib.rs index 9f7487aed3..ce1960c778 100644 --- a/src/common/procedure-test/src/lib.rs +++ b/src/common/procedure-test/src/lib.rs @@ -18,21 +18,27 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use common_procedure::store::poison_store::PoisonStore; +use common_procedure::test_util::InMemoryPoisonStore; use common_procedure::{ - Context, ContextProvider, Output, Procedure, ProcedureId, ProcedureState, ProcedureWithId, - Result, Status, + Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState, + ProcedureWithId, Result, Status, }; /// A Mock [ContextProvider]. #[derive(Default)] pub struct MockContextProvider { states: HashMap, + poison_manager: InMemoryPoisonStore, } impl MockContextProvider { /// Returns a new provider. pub fn new(states: HashMap) -> MockContextProvider { - MockContextProvider { states } + MockContextProvider { + states, + poison_manager: InMemoryPoisonStore::default(), + } } } @@ -41,6 +47,12 @@ impl ContextProvider for MockContextProvider { async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { Ok(self.states.get(&procedure_id).cloned()) } + + async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> { + self.poison_manager + .try_put_poison(key.to_string(), procedure_id.to_string()) + .await + } } /// Executes a procedure until it returns [Status::Done]. @@ -61,6 +73,7 @@ pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) -> Opti "Executing subprocedure is unsupported" ), Status::Done { output } => return output, + Status::Poisoned { .. } => return None, } } } @@ -88,6 +101,7 @@ pub async fn execute_procedure_once( false } Status::Done { .. } => true, + Status::Poisoned { .. } => false, } } @@ -109,6 +123,7 @@ pub async fn execute_until_suspended_or_done( Status::Executing { .. } => (), Status::Suspended { subprocedures, .. } => return Some(subprocedures), Status::Done { .. } => break, + Status::Poisoned { .. } => unreachable!(), } } diff --git a/src/common/procedure/src/error.rs b/src/common/procedure/src/error.rs index c1efe25734..9eea4dd0e7 100644 --- a/src/common/procedure/src/error.rs +++ b/src/common/procedure/src/error.rs @@ -21,6 +21,7 @@ use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; use crate::procedure::ProcedureId; +use crate::PoisonKey; /// Procedure error. #[derive(Snafu)] @@ -73,6 +74,32 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))] + PutPoison { + key: String, + token: String, + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + + #[snafu(display("Failed to get poison, key: '{key}'"))] + GetPoison { + key: String, + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + + #[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))] + DeletePoison { + key: String, + token: String, + #[snafu(implicit)] + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to delete {}", key))] DeleteState { key: String, @@ -182,6 +209,21 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Procedure not found, procedure_id: {}", procedure_id))] + ProcedureNotFound { + procedure_id: ProcedureId, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))] + PoisonKeyNotDefined { + key: PoisonKey, + procedure_id: ProcedureId, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -192,7 +234,10 @@ impl ErrorExt for Error { Error::External { source, .. } | Error::PutState { source, .. } | Error::DeleteStates { source, .. } - | Error::ListState { source, .. } => source.status_code(), + | Error::ListState { source, .. } + | Error::PutPoison { source, .. } + | Error::DeletePoison { source, .. } + | Error::GetPoison { source, .. } => source.status_code(), Error::ToJson { .. } | Error::DeleteState { .. } @@ -200,7 +245,8 @@ impl ErrorExt for Error { | Error::WaitWatcher { .. } | Error::RetryLater { .. } | Error::RollbackProcedureRecovered { .. } - | Error::TooManyRunningProcedures { .. } => StatusCode::Internal, + | Error::TooManyRunningProcedures { .. } + | Error::PoisonKeyNotDefined { .. } => StatusCode::Internal, Error::RetryTimesExceeded { .. } | Error::RollbackTimesExceeded { .. } @@ -212,7 +258,8 @@ impl ErrorExt for Error { } Error::ProcedurePanic { .. } | Error::ParseSegmentKey { .. } - | Error::Unexpected { .. } => StatusCode::Unexpected, + | Error::Unexpected { .. } + | &Error::ProcedureNotFound { .. } => StatusCode::Unexpected, Error::ProcedureExec { source, .. } => source.status_code(), Error::StartRemoveOutdatedMetaTask { source, .. } | Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(), diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 269ad2c529..e737366926 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -23,10 +23,13 @@ mod procedure; pub mod store; pub mod watcher; +#[cfg(any(test, feature = "testing"))] +pub mod test_util; + pub use crate::error::{Error, Result}; pub use crate::procedure::{ BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError, - Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef, ProcedureState, - ProcedureWithId, Status, StringKey, + PoisonKey, Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef, + ProcedureState, ProcedureWithId, Status, StringKey, }; pub use crate::watcher::Watcher; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index 9554b325c5..fb7f93267a 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -26,22 +26,23 @@ use backon::ExponentialBuilder; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{error, info, tracing}; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::{Mutex as TokioMutex, Notify}; use self::rwlock::KeyRwLock; use crate::error::{ - self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result, - StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu, - TooManyRunningProceduresSnafu, + self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, + PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, + StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu, }; use crate::local::runner::Runner; -use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo}; +use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo}; +use crate::store::poison_store::PoisonStoreRef; use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef}; use crate::{ - BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState, - ProcedureWithId, Watcher, + BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager, + ProcedureState, ProcedureWithId, Watcher, }; /// The expired time of a procedure's metadata. @@ -67,6 +68,8 @@ pub(crate) struct ProcedureMeta { child_notify: Notify, /// Lock required by this procedure. lock_key: LockKey, + /// Poison keys that may cause this procedure to become poisoned during execution. + poison_keys: PoisonKeys, /// Sender to notify the procedure state. state_sender: Sender, /// Receiver to watch the procedure state. @@ -85,6 +88,7 @@ impl ProcedureMeta { procedure_state: ProcedureState, parent_id: Option, lock_key: LockKey, + poison_keys: PoisonKeys, type_name: &str, ) -> ProcedureMeta { let (state_sender, state_receiver) = watch::channel(procedure_state); @@ -93,6 +97,7 @@ impl ProcedureMeta { parent_id, child_notify: Notify::new(), lock_key, + poison_keys, state_sender, state_receiver, children: Mutex::new(Vec::new()), @@ -163,6 +168,8 @@ pub(crate) struct ManagerContext { finished_procedures: Mutex>, /// Running flag. running: Arc, + /// Poison manager. + poison_manager: PoisonStoreRef, } #[async_trait] @@ -170,11 +177,33 @@ impl ContextProvider for ManagerContext { async fn procedure_state(&self, procedure_id: ProcedureId) -> Result> { Ok(self.state(procedure_id)) } + + async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> { + { + // validate the procedure exists + let procedures = self.procedures.read().unwrap(); + let procedure = procedures + .get(&procedure_id) + .context(ProcedureNotFoundSnafu { procedure_id })?; + + // validate the poison key is defined + ensure!( + procedure.poison_keys.contains(key), + PoisonKeyNotDefinedSnafu { + key: key.clone(), + procedure_id + } + ); + } + let key = key.to_string(); + let procedure_id = procedure_id.to_string(); + self.poison_manager.try_put_poison(key, procedure_id).await + } } impl ManagerContext { /// Returns a new [ManagerContext]. - fn new() -> ManagerContext { + fn new(poison_manager: PoisonStoreRef) -> ManagerContext { ManagerContext { key_lock: KeyRwLock::new(), loaders: Mutex::new(HashMap::new()), @@ -182,6 +211,7 @@ impl ManagerContext { running_procedures: Mutex::new(HashSet::new()), finished_procedures: Mutex::new(VecDeque::new()), running: Arc::new(AtomicBool::new(false)), + poison_manager, } } @@ -433,8 +463,12 @@ pub struct LocalManager { impl LocalManager { /// Create a new [LocalManager] with specific `config`. - pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager { - let manager_ctx = Arc::new(ManagerContext::new()); + pub fn new( + config: ManagerConfig, + state_store: StateStoreRef, + poison_store: PoisonStoreRef, + ) -> LocalManager { + let manager_ctx = Arc::new(ManagerContext::new(poison_store)); LocalManager { manager_ctx, @@ -472,6 +506,7 @@ impl LocalManager { procedure_state, None, procedure.lock_key(), + procedure.poison_keys(), procedure.type_name(), )); let runner = Runner { @@ -721,6 +756,7 @@ pub(crate) mod test_util { ProcedureState::Running, None, LockKey::default(), + PoisonKeys::default(), "ProcedureAdapter", ) } @@ -744,11 +780,17 @@ mod tests { use super::*; use crate::error::{self, Error}; use crate::store::state_store::ObjectStateStore; + use crate::test_util::InMemoryPoisonStore; use crate::{Context, Procedure, Status}; + fn new_test_manager_context() -> ManagerContext { + let poison_manager = Arc::new(InMemoryPoisonStore::default()); + ManagerContext::new(poison_manager) + } + #[test] fn test_manager_context() { - let ctx = ManagerContext::new(); + let ctx = new_test_manager_context(); let meta = Arc::new(test_util::procedure_meta_for_test()); assert!(!ctx.contains_procedure(meta.id)); @@ -764,7 +806,7 @@ mod tests { #[test] fn test_manager_context_insert_duplicate() { - let ctx = ManagerContext::new(); + let ctx = new_test_manager_context(); let meta = Arc::new(test_util::procedure_meta_for_test()); assert!(ctx.try_insert_procedure(meta.clone())); @@ -786,7 +828,7 @@ mod tests { #[test] fn test_procedures_in_tree() { - let ctx = ManagerContext::new(); + let ctx = new_test_manager_context(); let root = Arc::new(test_util::procedure_meta_for_test()); assert!(ctx.try_insert_procedure(root.clone())); @@ -810,6 +852,7 @@ mod tests { struct ProcedureToLoad { content: String, lock_key: LockKey, + poison_keys: PoisonKeys, } #[async_trait] @@ -829,6 +872,10 @@ mod tests { fn lock_key(&self) -> LockKey { self.lock_key.clone() } + + fn poison_keys(&self) -> PoisonKeys { + self.poison_keys.clone() + } } impl ProcedureToLoad { @@ -836,6 +883,7 @@ mod tests { ProcedureToLoad { content: content.to_string(), lock_key: LockKey::default(), + poison_keys: PoisonKeys::default(), } } @@ -858,7 +906,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.manager_ctx.start(); manager @@ -882,7 +931,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.manager_ctx.start(); manager @@ -935,7 +985,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.manager_ctx.start(); let procedure_id = ProcedureId::random(); @@ -986,7 +1037,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.manager_ctx.start(); #[derive(Debug)] @@ -1025,6 +1077,10 @@ mod tests { fn lock_key(&self) -> LockKey { LockKey::single_exclusive("test.submit") } + + fn poison_keys(&self) -> PoisonKeys { + PoisonKeys::default() + } } let check_procedure = |procedure| async { @@ -1062,7 +1118,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); let mut procedure = ProcedureToLoad::new("submit"); procedure.lock_key = LockKey::single_exclusive("test.submit"); @@ -1089,7 +1146,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.start().await.unwrap(); manager.stop().await.unwrap(); @@ -1125,7 +1183,8 @@ mod tests { max_running_procedures: 128, }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.manager_ctx.set_running(); let mut procedure = ProcedureToLoad::new("submit"); @@ -1206,7 +1265,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.manager_ctx.set_running(); manager @@ -1263,6 +1323,7 @@ mod tests { content: String, lock_key: LockKey, notify: Option>, + poison_keys: PoisonKeys, } #[async_trait] @@ -1287,6 +1348,10 @@ mod tests { self.notify.as_ref().unwrap().notify_one(); Ok(()) } + + fn poison_keys(&self) -> PoisonKeys { + self.poison_keys.clone() + } } impl ProcedureToRecover { @@ -1294,6 +1359,7 @@ mod tests { ProcedureToRecover { content: content.to_string(), lock_key: LockKey::default(), + poison_keys: PoisonKeys::default(), notify: None, } } @@ -1303,6 +1369,7 @@ mod tests { let procedure = ProcedureToRecover { content: json.to_string(), lock_key: LockKey::default(), + poison_keys: PoisonKeys::default(), notify: Some(notify.clone()), }; Ok(Box::new(procedure) as _) @@ -1323,7 +1390,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(object_store.clone())); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::new()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.manager_ctx.start(); let notify = Arc::new(Notify::new()); diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index a7972d8860..cf322f0967 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -238,11 +238,34 @@ impl Runner { } ProcedureState::Done { .. } => return, ProcedureState::Failed { .. } => return, + ProcedureState::Poisoned { .. } => return, } self.execute_once(ctx).await; } } + async fn clean_poisons(&mut self) -> Result<()> { + let mut error = None; + for key in self.meta.poison_keys.iter() { + let key = key.to_string(); + if let Err(e) = self + .manager_ctx + .poison_manager + .delete_poison(key, self.meta.id.to_string()) + .await + { + error!(e; "Failed to clean poisons for procedure: {}", self.meta.id); + error = Some(e); + } + } + + // returns the last error if any. + if let Some(e) = error { + return Err(e); + } + Ok(()) + } + async fn rollback(&mut self, ctx: &Context, err: Arc) { if self.procedure.rollback_supported() { if let Err(e) = self.procedure.rollback(ctx).await { @@ -255,7 +278,7 @@ impl Runner { } async fn prepare_rollback(&mut self, err: Arc) { - if let Err(e) = self.write_procedure_state(err.to_string()).await { + if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await { self.meta .set_state(ProcedureState::prepare_rollback(Arc::new(e))); return; @@ -288,26 +311,48 @@ impl Runner { return; } + // Cleans poisons before persist. + if status.need_clean_poisons() { + if let Err(e) = self.clean_poisons().await { + error!(e; "Failed to clean poison for procedure: {}", self.meta.id); + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); + return; + } + } + if status.need_persist() { - if let Err(err) = self.persist_procedure().await { - self.meta.set_state(ProcedureState::retrying(Arc::new(err))); + if let Err(e) = self.persist_procedure().await { + error!(e; "Failed to persist procedure: {}", self.meta.id); + self.meta.set_state(ProcedureState::retrying(Arc::new(e))); return; } } match status { - Status::Executing { .. } => (), + Status::Executing { .. } => {} Status::Suspended { subprocedures, .. } => { self.on_suspended(subprocedures).await; } Status::Done { output } => { if let Err(e) = self.commit_procedure().await { + error!(e; "Failed to commit procedure: {}", self.meta.id); self.meta.set_state(ProcedureState::retrying(Arc::new(e))); return; } self.done(output); } + Status::Poisoned { error, keys } => { + error!( + error; + "Procedure {}-{} is poisoned, keys: {:?}", + self.procedure.type_name(), + self.meta.id, + keys, + ); + self.meta + .set_state(ProcedureState::poisoned(keys, Arc::new(error))); + } } } Err(e) => { @@ -339,7 +384,9 @@ impl Runner { } ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await, ProcedureState::RollingBack { error } => self.rollback(ctx, error).await, - ProcedureState::Failed { .. } | ProcedureState::Done { .. } => (), + ProcedureState::Failed { .. } + | ProcedureState::Done { .. } + | ProcedureState::Poisoned { .. } => (), } } @@ -362,6 +409,7 @@ impl Runner { procedure_state, Some(self.meta.id), procedure.lock_key(), + procedure.poison_keys(), procedure.type_name(), )); let runner = Runner { @@ -484,7 +532,7 @@ impl Runner { Ok(()) } - async fn write_procedure_state(&mut self, error: String) -> Result<()> { + async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> { // Persists procedure state let type_name = self.procedure.type_name().to_string(); let data = self.procedure.dump()?; @@ -539,8 +587,10 @@ mod tests { use super::*; use crate::local::test_util; + use crate::procedure::PoisonKeys; use crate::store::proc_path; - use crate::{ContextProvider, Error, LockKey, Procedure}; + use crate::test_util::InMemoryPoisonStore; + use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure}; const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1"; @@ -552,7 +602,9 @@ mod tests { Runner { meta, procedure, - manager_ctx: Arc::new(ManagerContext::new()), + manager_ctx: Arc::new(ManagerContext::new( + Arc::new(InMemoryPoisonStore::default()), + )), step: 0, exponential_builder: ExponentialBuilder::default(), store, @@ -577,6 +629,16 @@ mod tests { assert_eq!(files, files_in_dir); } + fn context_with_provider( + procedure_id: ProcedureId, + provider: Arc, + ) -> Context { + Context { + procedure_id, + provider, + } + } + fn context_without_provider(procedure_id: ProcedureId) -> Context { struct MockProvider; @@ -588,6 +650,14 @@ mod tests { ) -> Result> { unimplemented!() } + + async fn try_put_poison( + &self, + _key: &PoisonKey, + _procedure_id: ProcedureId, + ) -> Result<()> { + unimplemented!() + } } Context { @@ -601,6 +671,7 @@ mod tests { struct ProcedureAdapter { data: String, lock_key: LockKey, + poison_keys: PoisonKeys, exec_fn: F, rollback_fn: Option, } @@ -610,6 +681,7 @@ mod tests { let mut meta = test_util::procedure_meta_for_test(); meta.id = ProcedureId::parse_str(uuid).unwrap(); meta.lock_key = self.lock_key.clone(); + meta.poison_keys = self.poison_keys.clone(); Arc::new(meta) } @@ -647,6 +719,10 @@ mod tests { fn lock_key(&self) -> LockKey { self.lock_key.clone() } + + fn poison_keys(&self) -> PoisonKeys { + self.poison_keys.clone() + } } async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) { @@ -655,7 +731,7 @@ mod tests { times += 1; async move { if times == 1 { - Ok(Status::Executing { persist }) + Ok(Status::executing(persist)) } else { Ok(Status::done()) } @@ -665,6 +741,7 @@ mod tests { let normal = ProcedureAdapter { data: "normal".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -729,6 +806,7 @@ mod tests { let suspend = ProcedureAdapter { data: "suspend".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -753,7 +831,7 @@ mod tests { async move { if times == 1 { time::sleep(Duration::from_millis(200)).await; - Ok(Status::Executing { persist: true }) + Ok(Status::executing(true)) } else { Ok(Status::done()) } @@ -763,6 +841,7 @@ mod tests { let child = ProcedureAdapter { data: "child".to_string(), lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -832,6 +911,7 @@ mod tests { let parent = ProcedureAdapter { data: "parent".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -843,7 +923,8 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone()); - let manager_ctx = Arc::new(ManagerContext::new()); + let poison_manager = Arc::new(InMemoryPoisonStore::default()); + let manager_ctx = Arc::new(ManagerContext::new(poison_manager)); manager_ctx.start(); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta)); @@ -875,10 +956,11 @@ mod tests { #[tokio::test] async fn test_running_is_stopped() { - let exec_fn = move |_| async move { Ok(Status::Executing { persist: true }) }.boxed(); + let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed(); let normal = ProcedureAdapter { data: "normal".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -923,6 +1005,7 @@ mod tests { let normal = ProcedureAdapter { data: "fail".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -949,6 +1032,7 @@ mod tests { let fail = ProcedureAdapter { data: "fail".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -985,6 +1069,7 @@ mod tests { let fail = ProcedureAdapter { data: "fail".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: Some(Box::new(rollback_fn)), }; @@ -1036,6 +1121,7 @@ mod tests { let retry_later = ProcedureAdapter { data: "retry_later".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -1072,6 +1158,7 @@ mod tests { let exceed_max_retry_later = ProcedureAdapter { data: "exceed_max_retry_later".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -1107,6 +1194,7 @@ mod tests { let exceed_max_retry_later = ProcedureAdapter { data: "exceed_max_rollback".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: Some(Box::new(rollback_fn)), }; @@ -1149,6 +1237,7 @@ mod tests { let retry_later = ProcedureAdapter { data: "rollback_after_retry_fail".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: Some(Box::new(rollback_fn)), }; @@ -1193,6 +1282,7 @@ mod tests { let fail = ProcedureAdapter { data: "fail".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -1228,6 +1318,7 @@ mod tests { let parent = ProcedureAdapter { data: "parent".to_string(), lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::default(), exec_fn, rollback_fn: None, }; @@ -1238,7 +1329,8 @@ mod tests { let object_store = test_util::new_object_store(&dir); let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); - let manager_ctx = Arc::new(ManagerContext::new()); + let poison_manager = Arc::new(InMemoryPoisonStore::default()); + let manager_ctx = Arc::new(ManagerContext::new(poison_manager)); manager_ctx.start(); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta.clone())); @@ -1251,4 +1343,235 @@ mod tests { let err = meta.state().error().unwrap().output_msg(); assert!(err.contains("subprocedure failed"), "{err}"); } + + #[tokio::test] + async fn test_execute_with_clean_poisons() { + common_telemetry::init_default_ut_logging(); + let mut times = 0; + let poison_key = PoisonKey::new("table/1024"); + let moved_poison_key = poison_key.clone(); + let exec_fn = move |ctx: Context| { + times += 1; + let poison_key = moved_poison_key.clone(); + async move { + if times == 1 { + // Put the poison to the context. + ctx.provider + .try_put_poison(&poison_key, ctx.procedure_id) + .await + .unwrap(); + + Ok(Status::executing_with_clean_poisons(true)) + } else { + Ok(Status::Poisoned { + keys: PoisonKeys::new(vec![poison_key.clone()]), + error: Error::external(MockError::new(StatusCode::Unexpected)), + }) + } + } + .boxed() + }; + let poison = ProcedureAdapter { + data: "poison".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::new(vec![poison_key.clone()]), + exec_fn, + rollback_fn: None, + }; + + let dir = create_temp_dir("clean_poisons"); + let meta = poison.new_meta(ROOT_ID); + + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone()); + + // Use the manager ctx as the context provider. + let ctx = context_with_provider( + meta.id, + runner.manager_ctx.clone() as Arc, + ); + // Manually add this procedure to the manager ctx. + runner + .manager_ctx + .procedures + .write() + .unwrap() + .insert(meta.id, runner.meta.clone()); + + runner.manager_ctx.start(); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); + + let procedure_id = runner + .manager_ctx + .poison_manager + .get_poison(&poison_key.to_string()) + .await + .unwrap(); + // poison key should be deleted. + assert!(procedure_id.is_none()); + } + + #[tokio::test] + async fn test_execute_failed_after_set_poison() { + let mut times = 0; + let poison_key = PoisonKey::new("table/1024"); + let moved_poison_key = poison_key.clone(); + let exec_fn = move |ctx: Context| { + times += 1; + let poison_key = moved_poison_key.clone(); + async move { + if times == 1 { + Ok(Status::executing(true)) + } else { + // Put the poison to the context. + ctx.provider + .try_put_poison(&poison_key, ctx.procedure_id) + .await + .unwrap(); + Err(Error::external(MockError::new(StatusCode::Unexpected))) + } + } + .boxed() + }; + let poison = ProcedureAdapter { + data: "poison".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::new(vec![poison_key.clone()]), + exec_fn, + rollback_fn: None, + }; + + let dir = create_temp_dir("poison"); + let meta = poison.new_meta(ROOT_ID); + + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone()); + + // Use the manager ctx as the context provider. + let ctx = context_with_provider( + meta.id, + runner.manager_ctx.clone() as Arc, + ); + // Manually add this procedure to the manager ctx. + runner + .manager_ctx + .procedures + .write() + .unwrap() + .insert(meta.id, runner.meta.clone()); + + runner.manager_ctx.start(); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_prepare_rollback(), "{state:?}"); + assert!(meta.state().is_prepare_rollback()); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_failed(), "{state:?}"); + assert!(meta.state().is_failed()); + + // Check the poison is set. + let procedure_id = runner + .manager_ctx + .poison_manager + .get_poison(&poison_key.to_string()) + .await + .unwrap() + .unwrap(); + + // If the procedure is poisoned, the poison key shouldn't be deleted. + assert_eq!(&procedure_id.to_string(), ROOT_ID); + } + + #[tokio::test] + async fn test_execute_poisoned() { + let mut times = 0; + let poison_key = PoisonKey::new("table/1024"); + let moved_poison_key = poison_key.clone(); + let exec_fn = move |ctx: Context| { + times += 1; + let poison_key = moved_poison_key.clone(); + async move { + if times == 1 { + Ok(Status::executing(true)) + } else { + // Put the poison to the context. + ctx.provider + .try_put_poison(&poison_key, ctx.procedure_id) + .await + .unwrap(); + Ok(Status::Poisoned { + keys: PoisonKeys::new(vec![poison_key.clone()]), + error: Error::external(MockError::new(StatusCode::Unexpected)), + }) + } + } + .boxed() + }; + let poison = ProcedureAdapter { + data: "poison".to_string(), + lock_key: LockKey::single_exclusive("catalog.schema.table"), + poison_keys: PoisonKeys::new(vec![poison_key.clone()]), + exec_fn, + rollback_fn: None, + }; + + let dir = create_temp_dir("poison"); + let meta = poison.new_meta(ROOT_ID); + + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone()); + + // Use the manager ctx as the context provider. + let ctx = context_with_provider( + meta.id, + runner.manager_ctx.clone() as Arc, + ); + // Manually add this procedure to the manager ctx. + runner + .manager_ctx + .procedures + .write() + .unwrap() + .insert(meta.id, runner.meta.clone()); + + runner.manager_ctx.start(); + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_running(), "{state:?}"); + + runner.execute_once(&ctx).await; + let state = runner.meta.state(); + assert!(state.is_poisoned(), "{state:?}"); + assert!(meta.state().is_poisoned()); + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.step"], + ) + .await; + + // Check the poison is set. + let procedure_id = runner + .manager_ctx + .poison_manager + .get_poison(&poison_key.to_string()) + .await + .unwrap() + .unwrap(); + + // If the procedure is poisoned, the poison key shouldn't be deleted. + assert_eq!(procedure_id, ROOT_ID); + } } diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index ddf5dc74a3..03a855c6dd 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -14,6 +14,7 @@ use std::any::Any; use std::fmt; +use std::fmt::Display; use std::str::FromStr; use std::sync::Arc; @@ -35,6 +36,8 @@ pub enum Status { Executing { /// Whether the framework needs to persist the procedure. persist: bool, + /// Whether the framework needs to clean the poisons. + clean_poisons: bool, }, /// The procedure has suspended itself and is waiting for subprocedures. Suspended { @@ -42,6 +45,13 @@ pub enum Status { /// Whether the framework needs to persist the procedure. persist: bool, }, + /// The procedure is poisoned. + Poisoned { + /// The keys that cause the procedure to be poisoned. + keys: PoisonKeys, + /// The error that cause the procedure to be poisoned. + error: Error, + }, /// the procedure is done. Done { output: Option }, } @@ -49,7 +59,18 @@ pub enum Status { impl Status { /// Returns a [Status::Executing] with given `persist` flag. pub fn executing(persist: bool) -> Status { - Status::Executing { persist } + Status::Executing { + persist, + clean_poisons: false, + } + } + + /// Returns a [Status::Executing] with given `persist` flag and clean poisons. + pub fn executing_with_clean_poisons(persist: bool) -> Status { + Status::Executing { + persist, + clean_poisons: true, + } } /// Returns a [Status::Done] without output. @@ -86,11 +107,20 @@ impl Status { /// Returns `true` if the procedure needs the framework to persist its intermediate state. pub fn need_persist(&self) -> bool { - // If the procedure is done, the framework doesn't need to persist the procedure - // anymore. It only needs to mark the procedure as committed. match self { - Status::Executing { persist } | Status::Suspended { persist, .. } => *persist, - Status::Done { .. } => false, + // If the procedure is done/poisoned, the framework doesn't need to persist the procedure + // anymore. It only needs to mark the procedure as committed. + Status::Executing { persist, .. } | Status::Suspended { persist, .. } => *persist, + Status::Done { .. } | Status::Poisoned { .. } => false, + } + } + + /// Returns `true` if the framework needs to clean the poisons. + pub fn need_clean_poisons(&self) -> bool { + match self { + Status::Executing { clean_poisons, .. } => *clean_poisons, + Status::Done { .. } => true, + _ => false, } } } @@ -100,6 +130,12 @@ impl Status { pub trait ContextProvider: Send + Sync { /// Query the procedure state. async fn procedure_state(&self, procedure_id: ProcedureId) -> Result>; + + /// Try to put a poison key for a procedure. + /// + /// This method is used to mark a resource as being operated on by a procedure. + /// If the poison key already exists with a different value, the operation will fail. + async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()>; } /// Reference-counted pointer to [ContextProvider]. @@ -147,6 +183,11 @@ pub trait Procedure: Send { /// Returns the [LockKey] that this procedure needs to acquire. fn lock_key(&self) -> LockKey; + + /// Returns the [PoisonKeys] that may cause this procedure to become poisoned during execution. + fn poison_keys(&self) -> PoisonKeys { + PoisonKeys::default() + } } #[async_trait] @@ -174,6 +215,54 @@ impl Procedure for Box { fn lock_key(&self) -> LockKey { (**self).lock_key() } + + fn poison_keys(&self) -> PoisonKeys { + (**self).poison_keys() + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct PoisonKey(String); + +impl Display for PoisonKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl PoisonKey { + /// Creates a new [PoisonKey] from a [String]. + pub fn new(key: impl Into) -> Self { + Self(key.into()) + } +} + +/// A collection of [PoisonKey]s. +/// +/// This type is used to represent the keys that may cause the procedure to become poisoned during execution. +#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Default)] +pub struct PoisonKeys(SmallVec<[PoisonKey; 2]>); + +impl PoisonKeys { + /// Creates a new [PoisonKeys] from a [String]. + pub fn single(key: impl Into) -> Self { + Self(smallvec![PoisonKey::new(key)]) + } + + /// Creates a new [PoisonKeys] from a [PoisonKey]. + pub fn new(keys: impl IntoIterator) -> Self { + Self(keys.into_iter().collect()) + } + + /// Returns `true` if the [PoisonKeys] contains the given [PoisonKey]. + pub fn contains(&self, key: &PoisonKey) -> bool { + self.0.contains(key) + } + + /// Returns an iterator over the [PoisonKey]s. + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } } #[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] @@ -325,6 +414,8 @@ pub enum ProcedureState { RollingBack { error: Arc }, /// The procedure is failed and cannot proceed anymore. Failed { error: Arc }, + /// The procedure is poisoned. + Poisoned { keys: PoisonKeys, error: Arc }, } impl ProcedureState { @@ -348,6 +439,11 @@ impl ProcedureState { ProcedureState::Retrying { error } } + /// Returns a [ProcedureState] with poisoned state. + pub fn poisoned(keys: PoisonKeys, error: Arc) -> ProcedureState { + ProcedureState::Poisoned { keys, error } + } + /// Returns true if the procedure state is running. pub fn is_running(&self) -> bool { matches!(self, ProcedureState::Running) @@ -358,6 +454,11 @@ impl ProcedureState { matches!(self, ProcedureState::Done { .. }) } + /// Returns true if the procedure state is poisoned. + pub fn is_poisoned(&self) -> bool { + matches!(self, ProcedureState::Poisoned { .. }) + } + /// Returns true if the procedure state failed. pub fn is_failed(&self) -> bool { matches!(self, ProcedureState::Failed { .. }) @@ -384,6 +485,7 @@ impl ProcedureState { ProcedureState::Failed { error } => Some(error), ProcedureState::Retrying { error } => Some(error), ProcedureState::RollingBack { error } => Some(error), + ProcedureState::Poisoned { error, .. } => Some(error), _ => None, } } @@ -397,6 +499,7 @@ impl ProcedureState { ProcedureState::Failed { .. } => "Failed", ProcedureState::PrepareRollback { .. } => "PrepareRollback", ProcedureState::RollingBack { .. } => "RollingBack", + ProcedureState::Poisoned { .. } => "Poisoned", } } } @@ -470,12 +573,18 @@ mod tests { #[test] fn test_status() { - let status = Status::Executing { persist: false }; + let status = Status::executing(false); assert!(!status.need_persist()); - let status = Status::Executing { persist: true }; + let status = Status::executing(true); assert!(status.need_persist()); + let status = Status::executing_with_clean_poisons(false); + assert!(status.need_clean_poisons()); + + let status = Status::executing_with_clean_poisons(true); + assert!(status.need_clean_poisons()); + let status = Status::Suspended { subprocedures: Vec::new(), persist: false, @@ -490,6 +599,7 @@ mod tests { let status = Status::done(); assert!(!status.need_persist()); + assert!(status.need_clean_poisons()); } #[test] diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index cebbe9b009..3d6c0c7256 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -24,6 +24,7 @@ use crate::error::{Result, ToJsonSnafu}; pub(crate) use crate::store::state_store::StateStoreRef; use crate::ProcedureId; +pub mod poison_store; pub mod state_store; pub mod util; @@ -341,6 +342,7 @@ mod tests { use object_store::ObjectStore; + use crate::procedure::PoisonKeys; use crate::store::state_store::ObjectStateStore; use crate::BoxedProcedure; @@ -503,6 +505,10 @@ mod tests { fn lock_key(&self) -> LockKey { LockKey::default() } + + fn poison_keys(&self) -> PoisonKeys { + PoisonKeys::default() + } } #[tokio::test] diff --git a/src/common/procedure/src/store/poison_store.rs b/src/common/procedure/src/store/poison_store.rs new file mode 100644 index 0000000000..88cf6723fe --- /dev/null +++ b/src/common/procedure/src/store/poison_store.rs @@ -0,0 +1,59 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::error::Result; + +pub type PoisonStoreRef = Arc; + +/// Poison store. +/// +/// This trait is used to manage the state of operations on resources, particularly +/// when an operation encounters an unrecoverable error, potentially leading to +/// metadata inconsistency. In such cases, manual intervention is required to +/// resolve the issue before any further operations can be performed on the resource. +/// +/// ## Behavior: +/// - **Insertion**: When an operation begins on a resource, a "poison" key is inserted +/// into the state store to indicate the operation is in progress. +/// - **Deletion**: If the operation completes successfully or +/// other cases can ensure the resource is in a consistent state, the poison key is removed +/// from the state store, indicating the resource is in a consistent state. +/// - **Failure Handling**: +/// - If the operation fails or other cases may lead to metadata inconsistency, +/// the poison key remains in the state store. +/// - The presence of this key indicates that the resource has encountered an +/// unrecoverable error and the metadata may be inconsistent. +/// - New operations on the same resource are rejected until the resource is +/// manually recovered and the poison key is removed. +#[async_trait] +pub trait PoisonStore: Send + Sync { + /// Try to put the poison key. + /// + /// If the poison key already exists with a different value, the operation will fail. + async fn try_put_poison(&self, key: String, token: String) -> Result<()>; + + /// Delete the poison key. + /// + /// If the poison key exists with a different value, the operation will fail. + async fn delete_poison(&self, key: String, token: String) -> Result<()>; + + /// Get the poison key. + /// + /// If the poison key does not exist, the operation will return `None`. + async fn get_poison(&self, key: &str) -> Result>; +} diff --git a/src/common/procedure/src/test_util.rs b/src/common/procedure/src/test_util.rs new file mode 100644 index 0000000000..21edbef333 --- /dev/null +++ b/src/common/procedure/src/test_util.rs @@ -0,0 +1,85 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +use snafu::ensure; + +use super::*; +use crate::error; +use crate::store::poison_store::PoisonStore; + +/// A poison store that uses an in-memory map to store the poison state. +#[derive(Debug, Default)] +pub struct InMemoryPoisonStore { + map: Arc>>, +} + +impl InMemoryPoisonStore { + /// Create a new in-memory poison manager. + pub fn new() -> Self { + Self::default() + } +} + +#[async_trait::async_trait] +impl PoisonStore for InMemoryPoisonStore { + async fn try_put_poison(&self, key: String, token: String) -> Result<()> { + let mut map = self.map.write().unwrap(); + match map.entry(key) { + Entry::Vacant(v) => { + v.insert(token.to_string()); + } + Entry::Occupied(o) => { + let value = o.get(); + ensure!( + value == &token, + error::UnexpectedSnafu { + err_msg: format!("The poison is already set by other token {}", value) + } + ); + } + } + Ok(()) + } + + async fn delete_poison(&self, key: String, token: String) -> Result<()> { + let mut map = self.map.write().unwrap(); + match map.entry(key) { + Entry::Vacant(_) => { + // do nothing + } + Entry::Occupied(o) => { + let value = o.get(); + ensure!( + value == &token, + error::UnexpectedSnafu { + err_msg: format!("The poison is not set by the token {}", value) + } + ); + + o.remove(); + } + } + Ok(()) + } + + async fn get_poison(&self, key: &str) -> Result> { + let map = self.map.read().unwrap(); + let key = key.to_string(); + Ok(map.get(&key).cloned()) + } +} diff --git a/src/common/procedure/src/watcher.rs b/src/common/procedure/src/watcher.rs index 09c4d6de5c..99af9a2dc7 100644 --- a/src/common/procedure/src/watcher.rs +++ b/src/common/procedure/src/watcher.rs @@ -43,6 +43,10 @@ pub async fn wait(watcher: &mut Watcher) -> Result> { ProcedureState::PrepareRollback { error } => { debug!("commit rollback, source: {}", error) } + ProcedureState::Poisoned { error, .. } => { + debug!("poisoned, source: {}", error); + return Err(error.clone()).context(ProcedureExecSnafu); + } } } } @@ -61,7 +65,9 @@ mod tests { use super::*; use crate::error::Error; use crate::local::{test_util, LocalManager, ManagerConfig}; + use crate::procedure::PoisonKeys; use crate::store::state_store::ObjectStateStore; + use crate::test_util::InMemoryPoisonStore; use crate::{ Context, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureWithId, Status, }; @@ -76,7 +82,8 @@ mod tests { ..Default::default() }; let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir))); - let manager = LocalManager::new(config, state_store); + let poison_manager = Arc::new(InMemoryPoisonStore::default()); + let manager = LocalManager::new(config, state_store, poison_manager); manager.start().await.unwrap(); #[derive(Debug)] @@ -106,6 +113,10 @@ mod tests { fn lock_key(&self) -> LockKey { LockKey::single_exclusive("test.submit") } + + fn poison_keys(&self) -> PoisonKeys { + PoisonKeys::default() + } } let procedure_id = ProcedureId::random(); diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 9153520a44..f672655378 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -113,7 +113,7 @@ impl Instance { .context(error::OpenRaftEngineBackendSnafu)?; let kv_backend = Arc::new(kv_backend); - let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); + let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone())); let manager_config = ManagerConfig { max_retry_times: procedure_config.max_retry_times, @@ -121,7 +121,11 @@ impl Instance { max_running_procedures: procedure_config.max_running_procedures, ..Default::default() }; - let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store)); + let procedure_manager = Arc::new(LocalManager::new( + manager_config, + kv_state_store.clone(), + kv_state_store, + )); Ok((kv_backend, procedure_manager)) } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ec0480cd9b..e3c1f58749 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -442,13 +442,20 @@ fn build_procedure_manager( max_running_procedures: options.procedure.max_running_procedures, ..Default::default() }; - let state_store = KvStateStore::new(kv_backend.clone()).with_max_value_size( - options - .procedure - .max_metadata_value_size - .map(|v| v.as_bytes() as usize), + let kv_state_store = Arc::new( + KvStateStore::new(kv_backend.clone()).with_max_value_size( + options + .procedure + .max_metadata_value_size + .map(|v| v.as_bytes() as usize), + ), ); - Arc::new(LocalManager::new(manager_config, Arc::new(state_store))) + + Arc::new(LocalManager::new( + manager_config, + kv_state_store.clone(), + kv_state_store, + )) } impl Default for MetasrvBuilder { diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 9a32f9d301..c229f1934a 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -31,6 +31,7 @@ use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; use common_meta::DatanodeId; use common_procedure::local::{LocalManager, ManagerConfig}; +use common_procedure::test_util::InMemoryPoisonStore; use common_procedure::{Context as ProcedureContext, ProcedureId, ProcedureManagerRef, Status}; use common_procedure_test::MockContextProvider; use common_telemetry::debug; @@ -85,7 +86,12 @@ impl TestingEnv { let opening_region_keeper = Arc::new(MemoryRegionKeeper::default()); let state_store = Arc::new(KvStateStore::new(kv_backend.clone())); - let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store)); + let poison_manager = Arc::new(InMemoryPoisonStore::default()); + let procedure_manager = Arc::new(LocalManager::new( + ManagerConfig::default(), + state_store, + poison_manager, + )); Self { table_metadata_manager, diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 2d25094c98..eda7d2c25a 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -224,7 +224,13 @@ async fn test_on_datanode_create_regions() { }); let status = procedure.on_datanode_create_regions().await.unwrap(); - assert!(matches!(status, Status::Executing { persist: true })); + assert!(matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false, + } + )); assert!(matches!( procedure.creator.data.state, CreateTableState::CreateMetadata @@ -291,7 +297,13 @@ async fn test_on_datanode_create_logical_regions() { procedure.check_tables_already_exist().await.unwrap(); let status = procedure.on_datanode_create_regions().await.unwrap(); - assert!(matches!(status, Status::Executing { persist: true })); + assert!(matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false, + } + )); assert!(matches!( procedure.data.state(), &CreateTablesState::CreateMetadata diff --git a/src/meta-srv/src/procedure/wal_prune.rs b/src/meta-srv/src/procedure/wal_prune.rs index b29a44a7fb..3b93552845 100644 --- a/src/meta-srv/src/procedure/wal_prune.rs +++ b/src/meta-srv/src/procedure/wal_prune.rs @@ -392,7 +392,13 @@ mod tests { // Step 1: Test `on_prepare`. let status = procedure.on_prepare().await.unwrap(); - assert_matches!(status, Status::Executing { persist: true }); + assert_matches!( + status, + Status::Executing { + persist: true, + clean_poisons: false + } + ); assert_matches!(procedure.data.state, WalPruneState::Prune); assert_eq!(procedure.data.min_flushed_entry_id, min_last_entry_id);