feat: introduce poison mechanism for procedure (#5822)

* feat: introduce poison for procedure

* tests: add unit tests

* refactor: minor refactor

* fix: unit tests

* chore: fix unit tests

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* chore: update comments

* chore: introduce `ProcedureStatus::Poisoned`

* chore: upgrade greptime-proto to `2be0f`

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-04-07 16:25:13 +08:00
committed by GitHub
parent 7b48ef1e97
commit 917510ffd0
28 changed files with 1226 additions and 91 deletions

3
Cargo.lock generated
View File

@@ -2268,6 +2268,7 @@ version = "0.14.0"
dependencies = [
"async-trait",
"common-procedure",
"snafu 0.8.5",
]
[[package]]
@@ -4690,7 +4691,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fb8e20ce29afd81835e3ea3c1164c8ce10de2c65#fb8e20ce29afd81835e3ea3c1164c8ce10de2c65"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2be0f36b3264e28ab0e1c22a980d0bb634eb3a77#2be0f36b3264e28ab0e1c22a980d0bb634eb3a77"
dependencies = [
"prost 0.13.3",
"serde",

View File

@@ -130,7 +130,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fb8e20ce29afd81835e3ea3c1164c8ce10de2c65" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2be0f36b3264e28ab0e1c22a980d0bb634eb3a77" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -80,7 +80,13 @@ pub async fn create_logical_table(
let tasks = vec![test_create_logical_table_task(table_name)];
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
let status = procedure.on_create_metadata().await.unwrap();
assert_matches!(status, Status::Done { .. });

View File

@@ -180,7 +180,13 @@ async fn test_on_prepare() {
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context);
let result = procedure.on_prepare().await;
assert_matches!(result, Ok(Status::Executing { persist: true }));
assert_matches!(
result,
Ok(Status::Executing {
persist: true,
clean_poisons: false
})
);
}
#[tokio::test]
@@ -205,7 +211,13 @@ async fn test_on_update_metadata() {
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context);
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
@@ -213,10 +225,22 @@ async fn test_on_update_metadata() {
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
}
#[tokio::test]
@@ -237,7 +261,13 @@ async fn test_on_part_duplicate_alter_request() {
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context.clone());
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
@@ -245,10 +275,22 @@ async fn test_on_part_duplicate_alter_request() {
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
// re-alter
let tasks = vec![
@@ -270,7 +312,13 @@ async fn test_on_part_duplicate_alter_request() {
let mut procedure = AlterLogicalTablesProcedure::new(tasks, phy_id, ddl_context.clone());
let mut status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
let ctx = common_procedure::Context {
procedure_id: ProcedureId::random(),
@@ -278,10 +326,22 @@ async fn test_on_part_duplicate_alter_request() {
};
// on_submit_alter_region_requests
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
// on_update_metadata
status = procedure.execute(&ctx).await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
let table_name_keys = vec![
TableNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "table1"),

View File

@@ -69,7 +69,13 @@ async fn test_on_prepare() {
let physical_table_id = table_id;
let mut procedure = CreateLogicalTablesProcedure::new(tasks, physical_table_id, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
}
#[tokio::test]
@@ -202,7 +208,13 @@ async fn test_on_prepare_part_logical_tables_exist() {
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
}
#[tokio::test]
@@ -238,7 +250,13 @@ async fn test_on_create_metadata() {
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
@@ -294,7 +312,13 @@ async fn test_on_create_metadata_part_logical_tables_exist() {
ddl_context,
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
@@ -339,7 +363,13 @@ async fn test_on_create_metadata_err() {
ddl_context.clone(),
);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
let ctx = ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),

View File

@@ -137,7 +137,13 @@ async fn test_on_prepare_without_create_if_table_exists() {
task.create_table.create_if_not_exists = true;
let mut procedure = CreateTableProcedure::new(task, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_eq!(procedure.table_id(), 1024);
}

View File

@@ -153,7 +153,13 @@ async fn test_on_prepare_without_create_if_table_exists() {
task.create_view.create_if_not_exists = true;
let mut procedure = CreateViewProcedure::new(task, ddl_context);
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_eq!(procedure.view_id(), 1024);
}

View File

@@ -834,6 +834,7 @@ mod tests {
use std::sync::Arc;
use common_procedure::local::LocalManager;
use common_procedure::test_util::InMemoryPoisonStore;
use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
@@ -883,7 +884,12 @@ mod tests {
));
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let procedure_manager = Arc::new(LocalManager::new(
Default::default(),
state_store,
poison_manager,
));
let _ = DdlManager::try_new(
DdlContext {

View File

@@ -748,6 +748,18 @@ pub enum Error {
#[snafu(source)]
error: serde_json::Error,
},
#[snafu(display(
"Procedure poison key already exists with a different value, key: {}, value: {}",
key,
value
))]
ProcedurePoisonConflict {
key: String,
value: String,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -766,7 +778,7 @@ impl ErrorExt for Error {
| SerializeToJson { .. }
| DeserializeFromJson { .. } => StatusCode::Internal,
ValueNotExist { .. } => StatusCode::Unexpected,
ValueNotExist { .. } | ProcedurePoisonConflict { .. } => StatusCode::Unexpected,
Unsupported { .. } => StatusCode::Unsupported,

View File

@@ -156,6 +156,7 @@ use crate::kv_backend::txn::{Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::rpc::router::{region_distribution, LeaderState, RegionRoute};
use crate::rpc::store::BatchDeleteRequest;
use crate::state_store::PoisonValue;
use crate::DatanodeId;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.@#]*";
@@ -1324,7 +1325,8 @@ impl_metadata_value! {
TableFlowValue,
NodeAddressValue,
SchemaNameValue,
FlowStateValue
FlowStateValue,
PoisonValue
}
impl_optional_metadata_value! {

View File

@@ -81,6 +81,7 @@ pub fn procedure_state_to_pb_state(state: &ProcedureState) -> (PbProcedureStatus
ProcedureState::RollingBack { error } => {
(PbProcedureStatus::RollingBack, error.to_string())
}
ProcedureState::Poisoned { error, .. } => (PbProcedureStatus::Poisoned, error.to_string()),
}
}

View File

@@ -14,16 +14,23 @@
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_procedure::error::{DeleteStatesSnafu, ListStateSnafu, PutStateSnafu};
use common_procedure::error::{
DeletePoisonSnafu, DeleteStatesSnafu, GetPoisonSnafu, ListStateSnafu, PutPoisonSnafu,
PutStateSnafu, Result as ProcedureResult,
};
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::store::state_store::{KeySet, KeyValueStream, StateStore};
use common_procedure::store::util::multiple_value_stream;
use common_procedure::Result as ProcedureResult;
use futures::future::try_join_all;
use futures::StreamExt;
use itertools::Itertools;
use snafu::ResultExt;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use crate::error::Result;
use crate::error::{ProcedurePoisonConflictSnafu, Result, UnexpectedSnafu};
use crate::key::txn_helper::TxnOpGetResponseSet;
use crate::key::{DeserializedValueWithBytes, MetadataValue};
use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp};
use crate::kv_backend::KvBackendRef;
use crate::range_stream::PaginationStream;
use crate::rpc::store::{BatchDeleteRequest, PutRequest, RangeRequest};
@@ -32,11 +39,16 @@ use crate::rpc::KeyValue;
const DELIMITER: &str = "/";
const PROCEDURE_PREFIX: &str = "/__procedure__/";
const PROCEDURE_POISON_KEY_PREFIX: &str = "/__procedure_poison/";
fn with_prefix(key: &str) -> String {
format!("{PROCEDURE_PREFIX}{key}")
}
fn with_poison_prefix(key: &str) -> String {
format!("{}{}", PROCEDURE_POISON_KEY_PREFIX, key)
}
fn strip_prefix(key: &str) -> String {
key.trim_start_matches(PROCEDURE_PREFIX).to_string()
}
@@ -207,8 +219,168 @@ impl StateStore for KvStateStore {
}
}
/// The value of the poison key.
///
/// Each poison value contains a unique token that identifies the procedure.
/// While multiple procedures may use the same poison key (representing the same resource),
/// each procedure will have a distinct token value to differentiate its ownership.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PoisonValue {
token: String,
}
type PoisonDecodeResult = Result<Option<DeserializedValueWithBytes<PoisonValue>>>;
impl KvStateStore {
/// Builds a create poison transaction,
/// it expected the `__procedure_poison/{key}` wasn't occupied.
fn build_create_poison_txn(
&self,
key: &str,
value: &PoisonValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult,
)> {
let key = key.as_bytes().to_vec();
let value = value.try_as_raw_value()?;
let txn = Txn::put_if_not_exists(key.clone(), value);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
/// Builds a delete poison transaction,
/// it expected the `__procedure_poison/{key}` was occupied.
fn build_delete_poison_txn(
&self,
key: &str,
value: PoisonValue,
) -> Result<(
Txn,
impl FnOnce(&mut TxnOpGetResponseSet) -> PoisonDecodeResult,
)> {
let key = key.as_bytes().to_vec();
let value = value.try_as_raw_value()?;
let txn = Txn::new()
.when(vec![Compare::with_value(
key.clone(),
CompareOp::Equal,
value,
)])
.and_then(vec![TxnOp::Delete(key.clone())])
.or_else(vec![TxnOp::Get(key.clone())]);
Ok((
txn,
TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(key)),
))
}
async fn get_poison_inner(&self, key: &str) -> Result<Option<PoisonValue>> {
let key = with_poison_prefix(key);
let value = self.kv_backend.get(key.as_bytes()).await?;
value
.map(|v| PoisonValue::try_from_raw_value(&v.value))
.transpose()
}
/// Put the poison.
///
/// If the poison is already put by other procedure, it will return an error.
async fn set_poison_inner(&self, key: &str, token: &str) -> Result<()> {
let key = with_poison_prefix(key);
let (txn, on_failure) = self.build_create_poison_txn(
&key,
&PoisonValue {
token: token.to_string(),
},
)?;
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_value = on_failure(&mut set)?
.context(UnexpectedSnafu {
err_msg: "Reads the empty poison value in comparing operation of the put consistency poison",
})?
.into_inner();
ensure!(
remote_value.token == token,
ProcedurePoisonConflictSnafu {
key: &key,
value: &remote_value.token,
}
);
}
Ok(())
}
/// Deletes the poison.
///
/// If the poison is not put by the procedure, it will return an error.
async fn delete_poison_inner(&self, key: &str, token: &str) -> Result<()> {
let key = with_poison_prefix(key);
let (txn, on_failure) = self.build_delete_poison_txn(
&key,
PoisonValue {
token: token.to_string(),
},
)?;
let mut resp = self.kv_backend.txn(txn).await?;
if !resp.succeeded {
let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
let remote_value = on_failure(&mut set)?;
ensure!(
remote_value.is_none(),
ProcedurePoisonConflictSnafu {
key: &key,
value: &remote_value.unwrap().into_inner().token,
}
);
}
Ok(())
}
}
#[async_trait]
impl PoisonStore for KvStateStore {
async fn try_put_poison(&self, key: String, token: String) -> ProcedureResult<()> {
self.set_poison_inner(&key, &token)
.await
.map_err(BoxedError::new)
.context(PutPoisonSnafu { key, token })
}
async fn delete_poison(&self, key: String, token: String) -> ProcedureResult<()> {
self.delete_poison_inner(&key, &token)
.await
.map_err(BoxedError::new)
.context(DeletePoisonSnafu { key, token })
}
async fn get_poison(&self, key: &str) -> ProcedureResult<Option<String>> {
self.get_poison_inner(key)
.await
.map(|v| v.map(|v| v.token))
.map_err(BoxedError::new)
.context(GetPoisonSnafu { key })
}
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::env;
use std::sync::Arc;
@@ -219,6 +391,7 @@ mod tests {
use uuid::Uuid;
use super::*;
use crate::error::Error;
use crate::kv_backend::chroot::ChrootKvBackend;
use crate::kv_backend::etcd::EtcdStore;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -397,4 +570,73 @@ mod tests {
)
.await;
}
#[tokio::test]
async fn test_poison() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let poison_manager = KvStateStore::new(mem_kv.clone());
let key = "table/1";
let token = "expected_token";
poison_manager.set_poison_inner(key, token).await.unwrap();
// Put again, should be ok.
poison_manager.set_poison_inner(key, token).await.unwrap();
// Delete, should be ok.
poison_manager
.delete_poison_inner(key, token)
.await
.unwrap();
// Delete again, should be ok.
poison_manager
.delete_poison_inner(key, token)
.await
.unwrap();
}
#[tokio::test]
async fn test_consistency_poison_failed() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let poison_manager = KvStateStore::new(mem_kv.clone());
let key = "table/1";
let token = "expected_token";
let token2 = "expected_token2";
poison_manager.set_poison_inner(key, token).await.unwrap();
let err = poison_manager
.set_poison_inner(key, token2)
.await
.unwrap_err();
assert_matches!(err, Error::ProcedurePoisonConflict { .. });
let err = poison_manager
.delete_poison_inner(key, token2)
.await
.unwrap_err();
assert_matches!(err, Error::ProcedurePoisonConflict { .. });
}
#[test]
fn test_serialize_deserialize() {
let key = "table/1";
let value = PoisonValue {
token: "expected_token".to_string(),
};
let serialized_key = with_poison_prefix(key).as_bytes().to_vec();
let serialized_value = value.try_as_raw_value().unwrap();
let expected_key = "/__procedure_poison/table/1";
let expected_value = r#"{"token":"expected_token"}"#;
assert_eq!(expected_key.as_bytes(), serialized_key);
assert_eq!(expected_value.as_bytes(), serialized_value);
}
}

View File

@@ -9,4 +9,5 @@ workspace = true
[dependencies]
async-trait.workspace = true
common-procedure.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
snafu.workspace = true

View File

@@ -18,21 +18,27 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{
Context, ContextProvider, Output, Procedure, ProcedureId, ProcedureState, ProcedureWithId,
Result, Status,
Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
ProcedureWithId, Result, Status,
};
/// A Mock [ContextProvider].
#[derive(Default)]
pub struct MockContextProvider {
states: HashMap<ProcedureId, ProcedureState>,
poison_manager: InMemoryPoisonStore,
}
impl MockContextProvider {
/// Returns a new provider.
pub fn new(states: HashMap<ProcedureId, ProcedureState>) -> MockContextProvider {
MockContextProvider { states }
MockContextProvider {
states,
poison_manager: InMemoryPoisonStore::default(),
}
}
}
@@ -41,6 +47,12 @@ impl ContextProvider for MockContextProvider {
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
Ok(self.states.get(&procedure_id).cloned())
}
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
self.poison_manager
.try_put_poison(key.to_string(), procedure_id.to_string())
.await
}
}
/// Executes a procedure until it returns [Status::Done].
@@ -61,6 +73,7 @@ pub async fn execute_procedure_until_done(procedure: &mut dyn Procedure) -> Opti
"Executing subprocedure is unsupported"
),
Status::Done { output } => return output,
Status::Poisoned { .. } => return None,
}
}
}
@@ -88,6 +101,7 @@ pub async fn execute_procedure_once(
false
}
Status::Done { .. } => true,
Status::Poisoned { .. } => false,
}
}
@@ -109,6 +123,7 @@ pub async fn execute_until_suspended_or_done(
Status::Executing { .. } => (),
Status::Suspended { subprocedures, .. } => return Some(subprocedures),
Status::Done { .. } => break,
Status::Poisoned { .. } => unreachable!(),
}
}

View File

@@ -21,6 +21,7 @@ use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use crate::procedure::ProcedureId;
use crate::PoisonKey;
/// Procedure error.
#[derive(Snafu)]
@@ -73,6 +74,32 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to put poison, key: '{key}', token: '{token}'"))]
PutPoison {
key: String,
token: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to get poison, key: '{key}'"))]
GetPoison {
key: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to delete poison, key: '{key}', token: '{token}'"))]
DeletePoison {
key: String,
token: String,
#[snafu(implicit)]
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to delete {}", key))]
DeleteState {
key: String,
@@ -182,6 +209,21 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Procedure not found, procedure_id: {}", procedure_id))]
ProcedureNotFound {
procedure_id: ProcedureId,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Poison key not defined, key: '{key}', procedure_id: '{procedure_id}'"))]
PoisonKeyNotDefined {
key: PoisonKey,
procedure_id: ProcedureId,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -192,7 +234,10 @@ impl ErrorExt for Error {
Error::External { source, .. }
| Error::PutState { source, .. }
| Error::DeleteStates { source, .. }
| Error::ListState { source, .. } => source.status_code(),
| Error::ListState { source, .. }
| Error::PutPoison { source, .. }
| Error::DeletePoison { source, .. }
| Error::GetPoison { source, .. } => source.status_code(),
Error::ToJson { .. }
| Error::DeleteState { .. }
@@ -200,7 +245,8 @@ impl ErrorExt for Error {
| Error::WaitWatcher { .. }
| Error::RetryLater { .. }
| Error::RollbackProcedureRecovered { .. }
| Error::TooManyRunningProcedures { .. } => StatusCode::Internal,
| Error::TooManyRunningProcedures { .. }
| Error::PoisonKeyNotDefined { .. } => StatusCode::Internal,
Error::RetryTimesExceeded { .. }
| Error::RollbackTimesExceeded { .. }
@@ -212,7 +258,8 @@ impl ErrorExt for Error {
}
Error::ProcedurePanic { .. }
| Error::ParseSegmentKey { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
| Error::Unexpected { .. }
| &Error::ProcedureNotFound { .. } => StatusCode::Unexpected,
Error::ProcedureExec { source, .. } => source.status_code(),
Error::StartRemoveOutdatedMetaTask { source, .. }
| Error::StopRemoveOutdatedMetaTask { source, .. } => source.status_code(),

View File

@@ -23,10 +23,13 @@ mod procedure;
pub mod store;
pub mod watcher;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError,
Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef, ProcedureState,
ProcedureWithId, Status, StringKey,
PoisonKey, Procedure, ProcedureId, ProcedureInfo, ProcedureManager, ProcedureManagerRef,
ProcedureState, ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;

View File

@@ -26,22 +26,23 @@ use backon::ExponentialBuilder;
use common_runtime::{RepeatedTask, TaskFunction};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{error, info, tracing};
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, Result,
StartRemoveOutdatedMetaTaskSnafu, StopRemoveOutdatedMetaTaskSnafu,
TooManyRunningProceduresSnafu,
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
StopRemoveOutdatedMetaTaskSnafu, TooManyRunningProceduresSnafu,
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, ProcedureInfo};
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
use crate::store::poison_store::PoisonStoreRef;
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, ProcedureId, ProcedureManager, ProcedureState,
ProcedureWithId, Watcher,
BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
ProcedureState, ProcedureWithId, Watcher,
};
/// The expired time of a procedure's metadata.
@@ -67,6 +68,8 @@ pub(crate) struct ProcedureMeta {
child_notify: Notify,
/// Lock required by this procedure.
lock_key: LockKey,
/// Poison keys that may cause this procedure to become poisoned during execution.
poison_keys: PoisonKeys,
/// Sender to notify the procedure state.
state_sender: Sender<ProcedureState>,
/// Receiver to watch the procedure state.
@@ -85,6 +88,7 @@ impl ProcedureMeta {
procedure_state: ProcedureState,
parent_id: Option<ProcedureId>,
lock_key: LockKey,
poison_keys: PoisonKeys,
type_name: &str,
) -> ProcedureMeta {
let (state_sender, state_receiver) = watch::channel(procedure_state);
@@ -93,6 +97,7 @@ impl ProcedureMeta {
parent_id,
child_notify: Notify::new(),
lock_key,
poison_keys,
state_sender,
state_receiver,
children: Mutex::new(Vec::new()),
@@ -163,6 +168,8 @@ pub(crate) struct ManagerContext {
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
/// Running flag.
running: Arc<AtomicBool>,
/// Poison manager.
poison_manager: PoisonStoreRef,
}
#[async_trait]
@@ -170,11 +177,33 @@ impl ContextProvider for ManagerContext {
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>> {
Ok(self.state(procedure_id))
}
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()> {
{
// validate the procedure exists
let procedures = self.procedures.read().unwrap();
let procedure = procedures
.get(&procedure_id)
.context(ProcedureNotFoundSnafu { procedure_id })?;
// validate the poison key is defined
ensure!(
procedure.poison_keys.contains(key),
PoisonKeyNotDefinedSnafu {
key: key.clone(),
procedure_id
}
);
}
let key = key.to_string();
let procedure_id = procedure_id.to_string();
self.poison_manager.try_put_poison(key, procedure_id).await
}
}
impl ManagerContext {
/// Returns a new [ManagerContext].
fn new() -> ManagerContext {
fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
ManagerContext {
key_lock: KeyRwLock::new(),
loaders: Mutex::new(HashMap::new()),
@@ -182,6 +211,7 @@ impl ManagerContext {
running_procedures: Mutex::new(HashSet::new()),
finished_procedures: Mutex::new(VecDeque::new()),
running: Arc::new(AtomicBool::new(false)),
poison_manager,
}
}
@@ -433,8 +463,12 @@ pub struct LocalManager {
impl LocalManager {
/// Create a new [LocalManager] with specific `config`.
pub fn new(config: ManagerConfig, state_store: StateStoreRef) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new());
pub fn new(
config: ManagerConfig,
state_store: StateStoreRef,
poison_store: PoisonStoreRef,
) -> LocalManager {
let manager_ctx = Arc::new(ManagerContext::new(poison_store));
LocalManager {
manager_ctx,
@@ -472,6 +506,7 @@ impl LocalManager {
procedure_state,
None,
procedure.lock_key(),
procedure.poison_keys(),
procedure.type_name(),
));
let runner = Runner {
@@ -721,6 +756,7 @@ pub(crate) mod test_util {
ProcedureState::Running,
None,
LockKey::default(),
PoisonKeys::default(),
"ProcedureAdapter",
)
}
@@ -744,11 +780,17 @@ mod tests {
use super::*;
use crate::error::{self, Error};
use crate::store::state_store::ObjectStateStore;
use crate::test_util::InMemoryPoisonStore;
use crate::{Context, Procedure, Status};
fn new_test_manager_context() -> ManagerContext {
let poison_manager = Arc::new(InMemoryPoisonStore::default());
ManagerContext::new(poison_manager)
}
#[test]
fn test_manager_context() {
let ctx = ManagerContext::new();
let ctx = new_test_manager_context();
let meta = Arc::new(test_util::procedure_meta_for_test());
assert!(!ctx.contains_procedure(meta.id));
@@ -764,7 +806,7 @@ mod tests {
#[test]
fn test_manager_context_insert_duplicate() {
let ctx = ManagerContext::new();
let ctx = new_test_manager_context();
let meta = Arc::new(test_util::procedure_meta_for_test());
assert!(ctx.try_insert_procedure(meta.clone()));
@@ -786,7 +828,7 @@ mod tests {
#[test]
fn test_procedures_in_tree() {
let ctx = ManagerContext::new();
let ctx = new_test_manager_context();
let root = Arc::new(test_util::procedure_meta_for_test());
assert!(ctx.try_insert_procedure(root.clone()));
@@ -810,6 +852,7 @@ mod tests {
struct ProcedureToLoad {
content: String,
lock_key: LockKey,
poison_keys: PoisonKeys,
}
#[async_trait]
@@ -829,6 +872,10 @@ mod tests {
fn lock_key(&self) -> LockKey {
self.lock_key.clone()
}
fn poison_keys(&self) -> PoisonKeys {
self.poison_keys.clone()
}
}
impl ProcedureToLoad {
@@ -836,6 +883,7 @@ mod tests {
ProcedureToLoad {
content: content.to_string(),
lock_key: LockKey::default(),
poison_keys: PoisonKeys::default(),
}
}
@@ -858,7 +906,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.manager_ctx.start();
manager
@@ -882,7 +931,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.manager_ctx.start();
manager
@@ -935,7 +985,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.manager_ctx.start();
let procedure_id = ProcedureId::random();
@@ -986,7 +1037,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.manager_ctx.start();
#[derive(Debug)]
@@ -1025,6 +1077,10 @@ mod tests {
fn lock_key(&self) -> LockKey {
LockKey::single_exclusive("test.submit")
}
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
}
let check_procedure = |procedure| async {
@@ -1062,7 +1118,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
let mut procedure = ProcedureToLoad::new("submit");
procedure.lock_key = LockKey::single_exclusive("test.submit");
@@ -1089,7 +1146,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.start().await.unwrap();
manager.stop().await.unwrap();
@@ -1125,7 +1183,8 @@ mod tests {
max_running_procedures: 128,
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.manager_ctx.set_running();
let mut procedure = ProcedureToLoad::new("submit");
@@ -1206,7 +1265,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.manager_ctx.set_running();
manager
@@ -1263,6 +1323,7 @@ mod tests {
content: String,
lock_key: LockKey,
notify: Option<Arc<Notify>>,
poison_keys: PoisonKeys,
}
#[async_trait]
@@ -1287,6 +1348,10 @@ mod tests {
self.notify.as_ref().unwrap().notify_one();
Ok(())
}
fn poison_keys(&self) -> PoisonKeys {
self.poison_keys.clone()
}
}
impl ProcedureToRecover {
@@ -1294,6 +1359,7 @@ mod tests {
ProcedureToRecover {
content: content.to_string(),
lock_key: LockKey::default(),
poison_keys: PoisonKeys::default(),
notify: None,
}
}
@@ -1303,6 +1369,7 @@ mod tests {
let procedure = ProcedureToRecover {
content: json.to_string(),
lock_key: LockKey::default(),
poison_keys: PoisonKeys::default(),
notify: Some(notify.clone()),
};
Ok(Box::new(procedure) as _)
@@ -1323,7 +1390,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(object_store.clone()));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::new());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.manager_ctx.start();
let notify = Arc::new(Notify::new());

View File

@@ -238,11 +238,34 @@ impl Runner {
}
ProcedureState::Done { .. } => return,
ProcedureState::Failed { .. } => return,
ProcedureState::Poisoned { .. } => return,
}
self.execute_once(ctx).await;
}
}
async fn clean_poisons(&mut self) -> Result<()> {
let mut error = None;
for key in self.meta.poison_keys.iter() {
let key = key.to_string();
if let Err(e) = self
.manager_ctx
.poison_manager
.delete_poison(key, self.meta.id.to_string())
.await
{
error!(e; "Failed to clean poisons for procedure: {}", self.meta.id);
error = Some(e);
}
}
// returns the last error if any.
if let Some(e) = error {
return Err(e);
}
Ok(())
}
async fn rollback(&mut self, ctx: &Context, err: Arc<Error>) {
if self.procedure.rollback_supported() {
if let Err(e) = self.procedure.rollback(ctx).await {
@@ -255,7 +278,7 @@ impl Runner {
}
async fn prepare_rollback(&mut self, err: Arc<Error>) {
if let Err(e) = self.write_procedure_state(err.to_string()).await {
if let Err(e) = self.write_rollback_procedure_state(err.to_string()).await {
self.meta
.set_state(ProcedureState::prepare_rollback(Arc::new(e)));
return;
@@ -288,26 +311,48 @@ impl Runner {
return;
}
// Cleans poisons before persist.
if status.need_clean_poisons() {
if let Err(e) = self.clean_poisons().await {
error!(e; "Failed to clean poison for procedure: {}", self.meta.id);
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
}
if status.need_persist() {
if let Err(err) = self.persist_procedure().await {
self.meta.set_state(ProcedureState::retrying(Arc::new(err)));
if let Err(e) = self.persist_procedure().await {
error!(e; "Failed to persist procedure: {}", self.meta.id);
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
}
match status {
Status::Executing { .. } => (),
Status::Executing { .. } => {}
Status::Suspended { subprocedures, .. } => {
self.on_suspended(subprocedures).await;
}
Status::Done { output } => {
if let Err(e) = self.commit_procedure().await {
error!(e; "Failed to commit procedure: {}", self.meta.id);
self.meta.set_state(ProcedureState::retrying(Arc::new(e)));
return;
}
self.done(output);
}
Status::Poisoned { error, keys } => {
error!(
error;
"Procedure {}-{} is poisoned, keys: {:?}",
self.procedure.type_name(),
self.meta.id,
keys,
);
self.meta
.set_state(ProcedureState::poisoned(keys, Arc::new(error)));
}
}
}
Err(e) => {
@@ -339,7 +384,9 @@ impl Runner {
}
ProcedureState::PrepareRollback { error } => self.prepare_rollback(error).await,
ProcedureState::RollingBack { error } => self.rollback(ctx, error).await,
ProcedureState::Failed { .. } | ProcedureState::Done { .. } => (),
ProcedureState::Failed { .. }
| ProcedureState::Done { .. }
| ProcedureState::Poisoned { .. } => (),
}
}
@@ -362,6 +409,7 @@ impl Runner {
procedure_state,
Some(self.meta.id),
procedure.lock_key(),
procedure.poison_keys(),
procedure.type_name(),
));
let runner = Runner {
@@ -484,7 +532,7 @@ impl Runner {
Ok(())
}
async fn write_procedure_state(&mut self, error: String) -> Result<()> {
async fn write_rollback_procedure_state(&mut self, error: String) -> Result<()> {
// Persists procedure state
let type_name = self.procedure.type_name().to_string();
let data = self.procedure.dump()?;
@@ -539,8 +587,10 @@ mod tests {
use super::*;
use crate::local::test_util;
use crate::procedure::PoisonKeys;
use crate::store::proc_path;
use crate::{ContextProvider, Error, LockKey, Procedure};
use crate::test_util::InMemoryPoisonStore;
use crate::{ContextProvider, Error, LockKey, PoisonKey, Procedure};
const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
@@ -552,7 +602,9 @@ mod tests {
Runner {
meta,
procedure,
manager_ctx: Arc::new(ManagerContext::new()),
manager_ctx: Arc::new(ManagerContext::new(
Arc::new(InMemoryPoisonStore::default()),
)),
step: 0,
exponential_builder: ExponentialBuilder::default(),
store,
@@ -577,6 +629,16 @@ mod tests {
assert_eq!(files, files_in_dir);
}
fn context_with_provider(
procedure_id: ProcedureId,
provider: Arc<dyn ContextProvider>,
) -> Context {
Context {
procedure_id,
provider,
}
}
fn context_without_provider(procedure_id: ProcedureId) -> Context {
struct MockProvider;
@@ -588,6 +650,14 @@ mod tests {
) -> Result<Option<ProcedureState>> {
unimplemented!()
}
async fn try_put_poison(
&self,
_key: &PoisonKey,
_procedure_id: ProcedureId,
) -> Result<()> {
unimplemented!()
}
}
Context {
@@ -601,6 +671,7 @@ mod tests {
struct ProcedureAdapter<F> {
data: String,
lock_key: LockKey,
poison_keys: PoisonKeys,
exec_fn: F,
rollback_fn: Option<RollbackFn>,
}
@@ -610,6 +681,7 @@ mod tests {
let mut meta = test_util::procedure_meta_for_test();
meta.id = ProcedureId::parse_str(uuid).unwrap();
meta.lock_key = self.lock_key.clone();
meta.poison_keys = self.poison_keys.clone();
Arc::new(meta)
}
@@ -647,6 +719,10 @@ mod tests {
fn lock_key(&self) -> LockKey {
self.lock_key.clone()
}
fn poison_keys(&self) -> PoisonKeys {
self.poison_keys.clone()
}
}
async fn execute_once_normal(persist: bool, first_files: &[&str], second_files: &[&str]) {
@@ -655,7 +731,7 @@ mod tests {
times += 1;
async move {
if times == 1 {
Ok(Status::Executing { persist })
Ok(Status::executing(persist))
} else {
Ok(Status::done())
}
@@ -665,6 +741,7 @@ mod tests {
let normal = ProcedureAdapter {
data: "normal".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -729,6 +806,7 @@ mod tests {
let suspend = ProcedureAdapter {
data: "suspend".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -753,7 +831,7 @@ mod tests {
async move {
if times == 1 {
time::sleep(Duration::from_millis(200)).await;
Ok(Status::Executing { persist: true })
Ok(Status::executing(true))
} else {
Ok(Status::done())
}
@@ -763,6 +841,7 @@ mod tests {
let child = ProcedureAdapter {
data: "child".to_string(),
lock_key: LockKey::new_exclusive(keys.iter().map(|k| k.to_string())),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -832,6 +911,7 @@ mod tests {
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -843,7 +923,8 @@ mod tests {
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone());
let manager_ctx = Arc::new(ManagerContext::new());
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
manager_ctx.start();
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta));
@@ -875,10 +956,11 @@ mod tests {
#[tokio::test]
async fn test_running_is_stopped() {
let exec_fn = move |_| async move { Ok(Status::Executing { persist: true }) }.boxed();
let exec_fn = move |_| async move { Ok(Status::executing(true)) }.boxed();
let normal = ProcedureAdapter {
data: "normal".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -923,6 +1005,7 @@ mod tests {
let normal = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -949,6 +1032,7 @@ mod tests {
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -985,6 +1069,7 @@ mod tests {
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
@@ -1036,6 +1121,7 @@ mod tests {
let retry_later = ProcedureAdapter {
data: "retry_later".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1072,6 +1158,7 @@ mod tests {
let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_retry_later".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1107,6 +1194,7 @@ mod tests {
let exceed_max_retry_later = ProcedureAdapter {
data: "exceed_max_rollback".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
@@ -1149,6 +1237,7 @@ mod tests {
let retry_later = ProcedureAdapter {
data: "rollback_after_retry_fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: Some(Box::new(rollback_fn)),
};
@@ -1193,6 +1282,7 @@ mod tests {
let fail = ProcedureAdapter {
data: "fail".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table.region-0"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1228,6 +1318,7 @@ mod tests {
let parent = ProcedureAdapter {
data: "parent".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::default(),
exec_fn,
rollback_fn: None,
};
@@ -1238,7 +1329,8 @@ mod tests {
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store);
let manager_ctx = Arc::new(ManagerContext::new());
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager_ctx = Arc::new(ManagerContext::new(poison_manager));
manager_ctx.start();
// Manually add this procedure to the manager ctx.
assert!(manager_ctx.try_insert_procedure(meta.clone()));
@@ -1251,4 +1343,235 @@ mod tests {
let err = meta.state().error().unwrap().output_msg();
assert!(err.contains("subprocedure failed"), "{err}");
}
#[tokio::test]
async fn test_execute_with_clean_poisons() {
common_telemetry::init_default_ut_logging();
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Ok(Status::executing_with_clean_poisons(true))
} else {
Ok(Status::Poisoned {
keys: PoisonKeys::new(vec![poison_key.clone()]),
error: Error::external(MockError::new(StatusCode::Unexpected)),
})
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("clean_poisons");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap();
// poison key should be deleted.
assert!(procedure_id.is_none());
}
#[tokio::test]
async fn test_execute_failed_after_set_poison() {
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
Ok(Status::executing(true))
} else {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Err(Error::external(MockError::new(StatusCode::Unexpected)))
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("poison");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_prepare_rollback(), "{state:?}");
assert!(meta.state().is_prepare_rollback());
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_failed(), "{state:?}");
assert!(meta.state().is_failed());
// Check the poison is set.
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap()
.unwrap();
// If the procedure is poisoned, the poison key shouldn't be deleted.
assert_eq!(&procedure_id.to_string(), ROOT_ID);
}
#[tokio::test]
async fn test_execute_poisoned() {
let mut times = 0;
let poison_key = PoisonKey::new("table/1024");
let moved_poison_key = poison_key.clone();
let exec_fn = move |ctx: Context| {
times += 1;
let poison_key = moved_poison_key.clone();
async move {
if times == 1 {
Ok(Status::executing(true))
} else {
// Put the poison to the context.
ctx.provider
.try_put_poison(&poison_key, ctx.procedure_id)
.await
.unwrap();
Ok(Status::Poisoned {
keys: PoisonKeys::new(vec![poison_key.clone()]),
error: Error::external(MockError::new(StatusCode::Unexpected)),
})
}
}
.boxed()
};
let poison = ProcedureAdapter {
data: "poison".to_string(),
lock_key: LockKey::single_exclusive("catalog.schema.table"),
poison_keys: PoisonKeys::new(vec![poison_key.clone()]),
exec_fn,
rollback_fn: None,
};
let dir = create_temp_dir("poison");
let meta = poison.new_meta(ROOT_ID);
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner = new_runner(meta.clone(), Box::new(poison), procedure_store.clone());
// Use the manager ctx as the context provider.
let ctx = context_with_provider(
meta.id,
runner.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
// Manually add this procedure to the manager ctx.
runner
.manager_ctx
.procedures
.write()
.unwrap()
.insert(meta.id, runner.meta.clone());
runner.manager_ctx.start();
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_running(), "{state:?}");
runner.execute_once(&ctx).await;
let state = runner.meta.state();
assert!(state.is_poisoned(), "{state:?}");
assert!(meta.state().is_poisoned());
check_files(
&object_store,
&procedure_store,
ctx.procedure_id,
&["0000000000.step"],
)
.await;
// Check the poison is set.
let procedure_id = runner
.manager_ctx
.poison_manager
.get_poison(&poison_key.to_string())
.await
.unwrap()
.unwrap();
// If the procedure is poisoned, the poison key shouldn't be deleted.
assert_eq!(procedure_id, ROOT_ID);
}
}

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use std::fmt;
use std::fmt::Display;
use std::str::FromStr;
use std::sync::Arc;
@@ -35,6 +36,8 @@ pub enum Status {
Executing {
/// Whether the framework needs to persist the procedure.
persist: bool,
/// Whether the framework needs to clean the poisons.
clean_poisons: bool,
},
/// The procedure has suspended itself and is waiting for subprocedures.
Suspended {
@@ -42,6 +45,13 @@ pub enum Status {
/// Whether the framework needs to persist the procedure.
persist: bool,
},
/// The procedure is poisoned.
Poisoned {
/// The keys that cause the procedure to be poisoned.
keys: PoisonKeys,
/// The error that cause the procedure to be poisoned.
error: Error,
},
/// the procedure is done.
Done { output: Option<Output> },
}
@@ -49,7 +59,18 @@ pub enum Status {
impl Status {
/// Returns a [Status::Executing] with given `persist` flag.
pub fn executing(persist: bool) -> Status {
Status::Executing { persist }
Status::Executing {
persist,
clean_poisons: false,
}
}
/// Returns a [Status::Executing] with given `persist` flag and clean poisons.
pub fn executing_with_clean_poisons(persist: bool) -> Status {
Status::Executing {
persist,
clean_poisons: true,
}
}
/// Returns a [Status::Done] without output.
@@ -86,11 +107,20 @@ impl Status {
/// Returns `true` if the procedure needs the framework to persist its intermediate state.
pub fn need_persist(&self) -> bool {
// If the procedure is done, the framework doesn't need to persist the procedure
// anymore. It only needs to mark the procedure as committed.
match self {
Status::Executing { persist } | Status::Suspended { persist, .. } => *persist,
Status::Done { .. } => false,
// If the procedure is done/poisoned, the framework doesn't need to persist the procedure
// anymore. It only needs to mark the procedure as committed.
Status::Executing { persist, .. } | Status::Suspended { persist, .. } => *persist,
Status::Done { .. } | Status::Poisoned { .. } => false,
}
}
/// Returns `true` if the framework needs to clean the poisons.
pub fn need_clean_poisons(&self) -> bool {
match self {
Status::Executing { clean_poisons, .. } => *clean_poisons,
Status::Done { .. } => true,
_ => false,
}
}
}
@@ -100,6 +130,12 @@ impl Status {
pub trait ContextProvider: Send + Sync {
/// Query the procedure state.
async fn procedure_state(&self, procedure_id: ProcedureId) -> Result<Option<ProcedureState>>;
/// Try to put a poison key for a procedure.
///
/// This method is used to mark a resource as being operated on by a procedure.
/// If the poison key already exists with a different value, the operation will fail.
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()>;
}
/// Reference-counted pointer to [ContextProvider].
@@ -147,6 +183,11 @@ pub trait Procedure: Send {
/// Returns the [LockKey] that this procedure needs to acquire.
fn lock_key(&self) -> LockKey;
/// Returns the [PoisonKeys] that may cause this procedure to become poisoned during execution.
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
}
#[async_trait]
@@ -174,6 +215,54 @@ impl<T: Procedure + ?Sized> Procedure for Box<T> {
fn lock_key(&self) -> LockKey {
(**self).lock_key()
}
fn poison_keys(&self) -> PoisonKeys {
(**self).poison_keys()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct PoisonKey(String);
impl Display for PoisonKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl PoisonKey {
/// Creates a new [PoisonKey] from a [String].
pub fn new(key: impl Into<String>) -> Self {
Self(key.into())
}
}
/// A collection of [PoisonKey]s.
///
/// This type is used to represent the keys that may cause the procedure to become poisoned during execution.
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, Default)]
pub struct PoisonKeys(SmallVec<[PoisonKey; 2]>);
impl PoisonKeys {
/// Creates a new [PoisonKeys] from a [String].
pub fn single(key: impl Into<String>) -> Self {
Self(smallvec![PoisonKey::new(key)])
}
/// Creates a new [PoisonKeys] from a [PoisonKey].
pub fn new(keys: impl IntoIterator<Item = PoisonKey>) -> Self {
Self(keys.into_iter().collect())
}
/// Returns `true` if the [PoisonKeys] contains the given [PoisonKey].
pub fn contains(&self, key: &PoisonKey) -> bool {
self.0.contains(key)
}
/// Returns an iterator over the [PoisonKey]s.
pub fn iter(&self) -> impl Iterator<Item = &PoisonKey> {
self.0.iter()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
@@ -325,6 +414,8 @@ pub enum ProcedureState {
RollingBack { error: Arc<Error> },
/// The procedure is failed and cannot proceed anymore.
Failed { error: Arc<Error> },
/// The procedure is poisoned.
Poisoned { keys: PoisonKeys, error: Arc<Error> },
}
impl ProcedureState {
@@ -348,6 +439,11 @@ impl ProcedureState {
ProcedureState::Retrying { error }
}
/// Returns a [ProcedureState] with poisoned state.
pub fn poisoned(keys: PoisonKeys, error: Arc<Error>) -> ProcedureState {
ProcedureState::Poisoned { keys, error }
}
/// Returns true if the procedure state is running.
pub fn is_running(&self) -> bool {
matches!(self, ProcedureState::Running)
@@ -358,6 +454,11 @@ impl ProcedureState {
matches!(self, ProcedureState::Done { .. })
}
/// Returns true if the procedure state is poisoned.
pub fn is_poisoned(&self) -> bool {
matches!(self, ProcedureState::Poisoned { .. })
}
/// Returns true if the procedure state failed.
pub fn is_failed(&self) -> bool {
matches!(self, ProcedureState::Failed { .. })
@@ -384,6 +485,7 @@ impl ProcedureState {
ProcedureState::Failed { error } => Some(error),
ProcedureState::Retrying { error } => Some(error),
ProcedureState::RollingBack { error } => Some(error),
ProcedureState::Poisoned { error, .. } => Some(error),
_ => None,
}
}
@@ -397,6 +499,7 @@ impl ProcedureState {
ProcedureState::Failed { .. } => "Failed",
ProcedureState::PrepareRollback { .. } => "PrepareRollback",
ProcedureState::RollingBack { .. } => "RollingBack",
ProcedureState::Poisoned { .. } => "Poisoned",
}
}
}
@@ -470,12 +573,18 @@ mod tests {
#[test]
fn test_status() {
let status = Status::Executing { persist: false };
let status = Status::executing(false);
assert!(!status.need_persist());
let status = Status::Executing { persist: true };
let status = Status::executing(true);
assert!(status.need_persist());
let status = Status::executing_with_clean_poisons(false);
assert!(status.need_clean_poisons());
let status = Status::executing_with_clean_poisons(true);
assert!(status.need_clean_poisons());
let status = Status::Suspended {
subprocedures: Vec::new(),
persist: false,
@@ -490,6 +599,7 @@ mod tests {
let status = Status::done();
assert!(!status.need_persist());
assert!(status.need_clean_poisons());
}
#[test]

View File

@@ -24,6 +24,7 @@ use crate::error::{Result, ToJsonSnafu};
pub(crate) use crate::store::state_store::StateStoreRef;
use crate::ProcedureId;
pub mod poison_store;
pub mod state_store;
pub mod util;
@@ -341,6 +342,7 @@ mod tests {
use object_store::ObjectStore;
use crate::procedure::PoisonKeys;
use crate::store::state_store::ObjectStateStore;
use crate::BoxedProcedure;
@@ -503,6 +505,10 @@ mod tests {
fn lock_key(&self) -> LockKey {
LockKey::default()
}
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
}
#[tokio::test]

View File

@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use crate::error::Result;
pub type PoisonStoreRef = Arc<dyn PoisonStore>;
/// Poison store.
///
/// This trait is used to manage the state of operations on resources, particularly
/// when an operation encounters an unrecoverable error, potentially leading to
/// metadata inconsistency. In such cases, manual intervention is required to
/// resolve the issue before any further operations can be performed on the resource.
///
/// ## Behavior:
/// - **Insertion**: When an operation begins on a resource, a "poison" key is inserted
/// into the state store to indicate the operation is in progress.
/// - **Deletion**: If the operation completes successfully or
/// other cases can ensure the resource is in a consistent state, the poison key is removed
/// from the state store, indicating the resource is in a consistent state.
/// - **Failure Handling**:
/// - If the operation fails or other cases may lead to metadata inconsistency,
/// the poison key remains in the state store.
/// - The presence of this key indicates that the resource has encountered an
/// unrecoverable error and the metadata may be inconsistent.
/// - New operations on the same resource are rejected until the resource is
/// manually recovered and the poison key is removed.
#[async_trait]
pub trait PoisonStore: Send + Sync {
/// Try to put the poison key.
///
/// If the poison key already exists with a different value, the operation will fail.
async fn try_put_poison(&self, key: String, token: String) -> Result<()>;
/// Delete the poison key.
///
/// If the poison key exists with a different value, the operation will fail.
async fn delete_poison(&self, key: String, token: String) -> Result<()>;
/// Get the poison key.
///
/// If the poison key does not exist, the operation will return `None`.
async fn get_poison(&self, key: &str) -> Result<Option<String>>;
}

View File

@@ -0,0 +1,85 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use snafu::ensure;
use super::*;
use crate::error;
use crate::store::poison_store::PoisonStore;
/// A poison store that uses an in-memory map to store the poison state.
#[derive(Debug, Default)]
pub struct InMemoryPoisonStore {
map: Arc<RwLock<HashMap<String, String>>>,
}
impl InMemoryPoisonStore {
/// Create a new in-memory poison manager.
pub fn new() -> Self {
Self::default()
}
}
#[async_trait::async_trait]
impl PoisonStore for InMemoryPoisonStore {
async fn try_put_poison(&self, key: String, token: String) -> Result<()> {
let mut map = self.map.write().unwrap();
match map.entry(key) {
Entry::Vacant(v) => {
v.insert(token.to_string());
}
Entry::Occupied(o) => {
let value = o.get();
ensure!(
value == &token,
error::UnexpectedSnafu {
err_msg: format!("The poison is already set by other token {}", value)
}
);
}
}
Ok(())
}
async fn delete_poison(&self, key: String, token: String) -> Result<()> {
let mut map = self.map.write().unwrap();
match map.entry(key) {
Entry::Vacant(_) => {
// do nothing
}
Entry::Occupied(o) => {
let value = o.get();
ensure!(
value == &token,
error::UnexpectedSnafu {
err_msg: format!("The poison is not set by the token {}", value)
}
);
o.remove();
}
}
Ok(())
}
async fn get_poison(&self, key: &str) -> Result<Option<String>> {
let map = self.map.read().unwrap();
let key = key.to_string();
Ok(map.get(&key).cloned())
}
}

View File

@@ -43,6 +43,10 @@ pub async fn wait(watcher: &mut Watcher) -> Result<Option<Output>> {
ProcedureState::PrepareRollback { error } => {
debug!("commit rollback, source: {}", error)
}
ProcedureState::Poisoned { error, .. } => {
debug!("poisoned, source: {}", error);
return Err(error.clone()).context(ProcedureExecSnafu);
}
}
}
}
@@ -61,7 +65,9 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::local::{test_util, LocalManager, ManagerConfig};
use crate::procedure::PoisonKeys;
use crate::store::state_store::ObjectStateStore;
use crate::test_util::InMemoryPoisonStore;
use crate::{
Context, LockKey, Procedure, ProcedureId, ProcedureManager, ProcedureWithId, Status,
};
@@ -76,7 +82,8 @@ mod tests {
..Default::default()
};
let state_store = Arc::new(ObjectStateStore::new(test_util::new_object_store(&dir)));
let manager = LocalManager::new(config, state_store);
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let manager = LocalManager::new(config, state_store, poison_manager);
manager.start().await.unwrap();
#[derive(Debug)]
@@ -106,6 +113,10 @@ mod tests {
fn lock_key(&self) -> LockKey {
LockKey::single_exclusive("test.submit")
}
fn poison_keys(&self) -> PoisonKeys {
PoisonKeys::default()
}
}
let procedure_id = ProcedureId::random();

View File

@@ -113,7 +113,7 @@ impl Instance {
.context(error::OpenRaftEngineBackendSnafu)?;
let kv_backend = Arc::new(kv_backend);
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let kv_state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let manager_config = ManagerConfig {
max_retry_times: procedure_config.max_retry_times,
@@ -121,7 +121,11 @@ impl Instance {
max_running_procedures: procedure_config.max_running_procedures,
..Default::default()
};
let procedure_manager = Arc::new(LocalManager::new(manager_config, state_store));
let procedure_manager = Arc::new(LocalManager::new(
manager_config,
kv_state_store.clone(),
kv_state_store,
));
Ok((kv_backend, procedure_manager))
}

View File

@@ -442,13 +442,20 @@ fn build_procedure_manager(
max_running_procedures: options.procedure.max_running_procedures,
..Default::default()
};
let state_store = KvStateStore::new(kv_backend.clone()).with_max_value_size(
options
.procedure
.max_metadata_value_size
.map(|v| v.as_bytes() as usize),
let kv_state_store = Arc::new(
KvStateStore::new(kv_backend.clone()).with_max_value_size(
options
.procedure
.max_metadata_value_size
.map(|v| v.as_bytes() as usize),
),
);
Arc::new(LocalManager::new(manager_config, Arc::new(state_store)))
Arc::new(LocalManager::new(
manager_config,
kv_state_store.clone(),
kv_state_store,
))
}
impl Default for MetasrvBuilder {

View File

@@ -31,6 +31,7 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::DatanodeId;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{Context as ProcedureContext, ProcedureId, ProcedureManagerRef, Status};
use common_procedure_test::MockContextProvider;
use common_telemetry::debug;
@@ -85,7 +86,12 @@ impl TestingEnv {
let opening_region_keeper = Arc::new(MemoryRegionKeeper::default());
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
let poison_manager = Arc::new(InMemoryPoisonStore::default());
let procedure_manager = Arc::new(LocalManager::new(
ManagerConfig::default(),
state_store,
poison_manager,
));
Self {
table_metadata_manager,

View File

@@ -224,7 +224,13 @@ async fn test_on_datanode_create_regions() {
});
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false,
}
));
assert!(matches!(
procedure.creator.data.state,
CreateTableState::CreateMetadata
@@ -291,7 +297,13 @@ async fn test_on_datanode_create_logical_regions() {
procedure.check_tables_already_exist().await.unwrap();
let status = procedure.on_datanode_create_regions().await.unwrap();
assert!(matches!(status, Status::Executing { persist: true }));
assert!(matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false,
}
));
assert!(matches!(
procedure.data.state(),
&CreateTablesState::CreateMetadata

View File

@@ -392,7 +392,13 @@ mod tests {
// Step 1: Test `on_prepare`.
let status = procedure.on_prepare().await.unwrap();
assert_matches!(status, Status::Executing { persist: true });
assert_matches!(
status,
Status::Executing {
persist: true,
clean_poisons: false
}
);
assert_matches!(procedure.data.state, WalPruneState::Prune);
assert_eq!(procedure.data.min_flushed_entry_id, min_last_entry_id);