diff --git a/src/common/procedure-test/src/lib.rs b/src/common/procedure-test/src/lib.rs index 4a5a2b85cb..9e98a4972e 100644 --- a/src/common/procedure-test/src/lib.rs +++ b/src/common/procedure-test/src/lib.rs @@ -18,11 +18,13 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use common_procedure::local::{acquire_dynamic_key_lock, DynamicKeyLockGuard}; +use common_procedure::rwlock::KeyRwLock; use common_procedure::store::poison_store::PoisonStore; use common_procedure::test_util::InMemoryPoisonStore; use common_procedure::{ Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState, - ProcedureWithId, Result, Status, + ProcedureWithId, Result, Status, StringKey, }; /// A Mock [ContextProvider]. @@ -30,6 +32,7 @@ use common_procedure::{ pub struct MockContextProvider { states: HashMap, poison_manager: InMemoryPoisonStore, + dynamic_key_lock: Arc>, } impl MockContextProvider { @@ -38,6 +41,7 @@ impl MockContextProvider { MockContextProvider { states, poison_manager: InMemoryPoisonStore::default(), + dynamic_key_lock: Arc::new(KeyRwLock::new()), } } @@ -58,6 +62,10 @@ impl ContextProvider for MockContextProvider { .try_put_poison(key.to_string(), procedure_id.to_string()) .await } + + async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard { + acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await + } } /// Executes a procedure until it returns [Status::Done]. diff --git a/src/common/procedure/src/lib.rs b/src/common/procedure/src/lib.rs index 69b4f2fa5c..2cd7c39d0c 100644 --- a/src/common/procedure/src/lib.rs +++ b/src/common/procedure/src/lib.rs @@ -20,6 +20,7 @@ pub mod error; pub mod local; pub mod options; mod procedure; +pub mod rwlock; pub mod store; pub mod watcher; @@ -28,8 +29,8 @@ pub mod test_util; pub use crate::error::{Error, Result}; pub use crate::procedure::{ - BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError, - PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager, - ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey, + BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, ContextProviderRef, LockKey, + Output, ParseIdError, PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, + ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey, }; pub use crate::watcher::Watcher; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index fb7f93267a..811f858d92 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -13,7 +13,6 @@ // limitations under the License. mod runner; -mod rwlock; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet, VecDeque}; @@ -30,7 +29,6 @@ use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::watch::{self, Receiver, Sender}; use tokio::sync::{Mutex as TokioMutex, Notify}; -use self::rwlock::KeyRwLock; use crate::error::{ self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu, PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu, @@ -38,11 +36,12 @@ use crate::error::{ }; use crate::local::runner::Runner; use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo}; +use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard}; use crate::store::poison_store::PoisonStoreRef; use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef}; use crate::{ BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager, - ProcedureState, ProcedureWithId, Watcher, + ProcedureState, ProcedureWithId, StringKey, Watcher, }; /// The expired time of a procedure's metadata. @@ -157,12 +156,80 @@ struct LoadedProcedure { step: u32, } +/// The dynamic lock for procedure execution. +/// +/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure +/// during execution. They are only held when the procedure specifically needs these keys +/// and are released as soon as the procedure no longer needs them. +/// This allows for more fine-grained concurrency control during procedure execution. +pub(crate) type DynamicKeyLock = Arc>; + +/// Acquires a dynamic key lock for the given key. +/// +/// This function takes a reference to the dynamic key lock and a pointer to the key. +/// It then matches the key type and acquires the appropriate lock. +pub async fn acquire_dynamic_key_lock( + lock: &DynamicKeyLock, + key: &StringKey, +) -> DynamicKeyLockGuard { + match key { + StringKey::Share(key) => { + let guard = lock.read(key.to_string()).await; + DynamicKeyLockGuard { + guard: Some(OwnedKeyRwLockGuard::from(guard)), + key: key.to_string(), + lock: lock.clone(), + } + } + StringKey::Exclusive(key) => { + let guard = lock.write(key.to_string()).await; + DynamicKeyLockGuard { + guard: Some(OwnedKeyRwLockGuard::from(guard)), + key: key.to_string(), + lock: lock.clone(), + } + } + } +} +/// A guard for the dynamic key lock. +/// +/// This guard is used to release the lock when the procedure no longer needs it. +/// It also ensures that the lock is cleaned up when the guard is dropped. +pub struct DynamicKeyLockGuard { + guard: Option, + key: String, + lock: DynamicKeyLock, +} + +impl Drop for DynamicKeyLockGuard { + fn drop(&mut self) { + if let Some(guard) = self.guard.take() { + drop(guard); + } + self.lock.clean_keys(&[self.key.to_string()]); + } +} + /// Shared context of the manager. pub(crate) struct ManagerContext { /// Procedure loaders. The key is the type name of the procedure which the loader returns. loaders: Mutex>, + /// The key lock for the procedure. + /// + /// The lock keys are defined in `Procedure::lock_key()`. + /// These locks are acquired before the procedure starts and released after the procedure finishes. + /// They ensure exclusive access to resources throughout the entire procedure lifecycle. key_lock: KeyRwLock, + /// The dynamic lock for procedure execution. + /// + /// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure + /// during execution. They are only held when the procedure specifically needs these keys + /// and are released as soon as the procedure no longer needs them. + /// This allows for more fine-grained concurrency control during procedure execution. + dynamic_key_lock: DynamicKeyLock, + /// Procedures in the manager. procedures: RwLock>, + /// Running procedures. running_procedures: Mutex>, /// Ids and finished time of finished procedures. finished_procedures: Mutex>, @@ -199,6 +266,10 @@ impl ContextProvider for ManagerContext { let procedure_id = procedure_id.to_string(); self.poison_manager.try_put_poison(key, procedure_id).await } + + async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard { + acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await + } } impl ManagerContext { @@ -206,6 +277,7 @@ impl ManagerContext { fn new(poison_manager: PoisonStoreRef) -> ManagerContext { ManagerContext { key_lock: KeyRwLock::new(), + dynamic_key_lock: Arc::new(KeyRwLock::new()), loaders: Mutex::new(HashMap::new()), procedures: RwLock::new(HashMap::new()), running_procedures: Mutex::new(HashSet::new()), diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 41067b5e01..c2df1a9466 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -23,9 +23,9 @@ use snafu::ResultExt; use tokio::time; use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu}; -use crate::local::rwlock::OwnedKeyRwLockGuard; use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef}; use crate::procedure::{Output, StringKey}; +use crate::rwlock::OwnedKeyRwLockGuard; use crate::store::{ProcedureMessage, ProcedureStore}; use crate::{ BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status, @@ -581,6 +581,7 @@ impl Runner { #[cfg(test)] mod tests { + use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -588,13 +589,14 @@ mod tests { use common_error::mock::MockError; use common_error::status_code::StatusCode; use common_test_util::temp_dir::create_temp_dir; + use futures::future::join_all; use futures_util::future::BoxFuture; use futures_util::FutureExt; use object_store::{EntryMode, ObjectStore}; use tokio::sync::mpsc; use super::*; - use crate::local::test_util; + use crate::local::{test_util, DynamicKeyLockGuard}; use crate::procedure::PoisonKeys; use crate::store::proc_path; use crate::test_util::InMemoryPoisonStore; @@ -666,6 +668,10 @@ mod tests { ) -> Result<()> { unimplemented!() } + + async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard { + unimplemented!() + } } Context { @@ -1674,4 +1680,66 @@ mod tests { // If the procedure is poisoned, the poison key shouldn't be deleted. assert_eq!(procedure_id, ROOT_ID); } + + fn test_procedure_with_dynamic_lock( + shared_atomic_value: Arc, + id: u64, + ) -> (BoxedProcedure, Arc) { + let exec_fn = move |ctx: Context| { + let moved_shared_atomic_value = shared_atomic_value.clone(); + let moved_ctx = ctx.clone(); + async move { + debug!("Acquiring write lock, id: {}", id); + let key = StringKey::Exclusive("test_lock".to_string()); + let guard = moved_ctx.provider.acquire_lock(&key).await; + debug!("Acquired write lock, id: {}", id); + let millis = rand::rng().random_range(10..=50); + tokio::time::sleep(Duration::from_millis(millis)).await; + let value = moved_shared_atomic_value.load(Ordering::Relaxed); + moved_shared_atomic_value.store(value + 1, Ordering::Relaxed); + debug!("Dropping write lock, id: {}", id); + drop(guard); + + Ok(Status::done()) + } + .boxed() + }; + + let adapter = ProcedureAdapter { + data: "dynamic_lock".to_string(), + lock_key: LockKey::new_exclusive([]), + poison_keys: PoisonKeys::new([]), + exec_fn, + rollback_fn: None, + }; + let meta = adapter.new_meta(ROOT_ID); + + (Box::new(adapter), meta) + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_execute_with_dynamic_lock() { + common_telemetry::init_default_ut_logging(); + let shared_atomic_value = Arc::new(AtomicU64::new(0)); + let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1); + let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2); + + let dir = create_temp_dir("dynamic_lock"); + let object_store = test_util::new_object_store(&dir); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone()); + let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone()); + let ctx1 = context_with_provider( + meta1.id, + runner1.manager_ctx.clone() as Arc, + ); + let ctx2 = context_with_provider( + meta2.id, + // use same manager ctx as runner1 + runner1.manager_ctx.clone() as Arc, + ); + let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)]; + join_all(tasks).await; + assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2); + } } diff --git a/src/common/procedure/src/procedure.rs b/src/common/procedure/src/procedure.rs index 0d3e7509ad..e208f754b0 100644 --- a/src/common/procedure/src/procedure.rs +++ b/src/common/procedure/src/procedure.rs @@ -25,6 +25,7 @@ use snafu::{ResultExt, Snafu}; use uuid::Uuid; use crate::error::{self, Error, Result}; +use crate::local::DynamicKeyLockGuard; use crate::watcher::Watcher; pub type Output = Arc; @@ -144,6 +145,9 @@ pub trait ContextProvider: Send + Sync { /// This method is used to mark a resource as being operated on by a procedure. /// If the poison key already exists with a different value, the operation will fail. async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()>; + + /// Acquires a key lock for the procedure. + async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard; } /// Reference-counted pointer to [ContextProvider]. diff --git a/src/common/procedure/src/local/rwlock.rs b/src/common/procedure/src/rwlock.rs similarity index 93% rename from src/common/procedure/src/local/rwlock.rs rename to src/common/procedure/src/rwlock.rs index 538914e395..cd5a7cf374 100644 --- a/src/common/procedure/src/local/rwlock.rs +++ b/src/common/procedure/src/rwlock.rs @@ -18,8 +18,18 @@ use std::sync::{Arc, Mutex}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; +/// A guard that owns a read or write lock on a key. +/// +/// This enum wraps either a read or write lock guard obtained from a `KeyRwLock`. +/// The guard is automatically released when it is dropped. pub enum OwnedKeyRwLockGuard { + /// Represents a shared read lock on a key. + /// Multiple read locks can be held simultaneously for the same key. Read { _guard: OwnedRwLockReadGuard<()> }, + + /// Represents an exclusive write lock on a key. + /// Only one write lock can be held at a time for a given key, + /// and no read locks can be held simultaneously with a write lock. Write { _guard: OwnedRwLockWriteGuard<()> }, } @@ -36,7 +46,7 @@ impl From> for OwnedKeyRwLockGuard { } /// Locks based on a key, allowing other keys to lock independently. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct KeyRwLock { /// The inner map of locks for specific keys. inner: Mutex>>>, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index a83bf5dfab..dd82a34589 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -37,7 +37,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; -use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock}; +use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock}; use common_meta::peer::Peer; use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard}; use common_procedure::error::{ @@ -97,9 +97,6 @@ impl PersistentContext { let lock_key = vec![ CatalogLock::Read(&self.catalog).into(), SchemaLock::read(&self.catalog, &self.schema).into(), - // The optimistic updating of table route is not working very well, - // so we need to use the write lock here. - TableLock::Write(region_id.table_id()).into(), RegionLock::Write(region_id).into(), ]; @@ -531,7 +528,11 @@ pub(crate) trait State: Sync + Send + Debug { } /// Yields the next [State] and [Status]. - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)>; + async fn next( + &mut self, + ctx: &mut Context, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)>; /// Returns as [Any](std::any::Any). fn as_any(&self) -> &dyn Any; @@ -666,14 +667,14 @@ impl Procedure for RegionMigrationProcedure { true } - async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult { + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { let state = &mut self.state; let name = state.name(); let _timer = METRIC_META_REGION_MIGRATION_EXECUTE .with_label_values(&[name]) .start_timer(); - let (next, status) = state.next(&mut self.context).await.map_err(|e| { + let (next, status) = state.next(&mut self.context, ctx).await.map_err(|e| { if e.is_retryable() { METRIC_META_REGION_MIGRATION_ERROR .with_label_values(&[name, "retryable"]) @@ -785,7 +786,11 @@ mod tests { #[async_trait::async_trait] #[typetag::serde] impl State for MockState { - async fn next(&mut self, _ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + _ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { Ok((Box::new(MockState), Status::done())) } diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index ba13f7cdea..bbd1e7a0c2 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -20,7 +20,7 @@ use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_meta::key::datanode_table::RegionInfo; use common_meta::RegionIdent; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; use snafu::ResultExt; @@ -40,7 +40,11 @@ pub struct CloseDowngradedRegion; #[async_trait::async_trait] #[typetag::serde] impl State for CloseDowngradedRegion { - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { if let Err(err) = self.close_downgraded_leader_region(ctx).await { let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer; let region_id = ctx.region_id(); diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index a298c5541d..56a28c7fb8 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -20,7 +20,7 @@ use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{ DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply, }; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{error, info, warn}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -53,7 +53,11 @@ impl Default for DowngradeLeaderRegion { #[async_trait::async_trait] #[typetag::serde] impl State for DowngradeLeaderRegion { - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { let now = Instant::now(); // Ensures the `leader_region_lease_deadline` must exist after recovering. ctx.volatile_ctx @@ -270,7 +274,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv}; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; use crate::procedure::test_util::{ new_close_region_reply, new_downgrade_region_reply, send_mock_reply, @@ -572,7 +576,8 @@ mod tests { }); let timer = Instant::now(); - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let elapsed = timer.elapsed().as_secs(); assert!(elapsed < REGION_LEASE_SECS / 2); assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1)); @@ -606,7 +611,8 @@ mod tests { send_mock_reply(mailbox, rx, |id| { Ok(new_downgrade_region_reply(id, None, true, None)) }); - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let update_metadata = next.as_any().downcast_ref::().unwrap(); assert_matches!(update_metadata, UpdateMetadata::Rollback); } diff --git a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs index aeeeb836f6..88fab129ad 100644 --- a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs @@ -16,7 +16,7 @@ use std::any::Any; use api::v1::meta::MailboxMessage; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -37,7 +37,11 @@ pub struct PreFlushRegion; #[async_trait::async_trait] #[typetag::serde] impl State for PreFlushRegion { - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { let timer = Instant::now(); self.flush_region(ctx).await?; ctx.update_flush_leader_region_elapsed(timer); @@ -163,7 +167,7 @@ mod tests { use store_api::storage::RegionId; use super::*; - use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv}; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; use crate::procedure::test_util::{ new_close_region_reply, new_flush_region_reply, send_mock_reply, @@ -277,7 +281,8 @@ mod tests { .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx) .await; send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None))); - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let update_metadata = next.as_any().downcast_ref::().unwrap(); assert_matches!(update_metadata, UpdateMetadata::Downgrade); diff --git a/src/meta-srv/src/procedure/region_migration/migration_abort.rs b/src/meta-srv/src/procedure/region_migration/migration_abort.rs index d364f0c8b9..d53a7f6a57 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_abort.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_abort.rs @@ -14,7 +14,7 @@ use std::any::Any; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::warn; use serde::{Deserialize, Serialize}; @@ -38,7 +38,11 @@ impl RegionMigrationAbort { #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationAbort { - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { warn!( "Region migration is aborted: {}, region_id: {}, from_peer: {}, to_peer: {}, {}", self.reason, diff --git a/src/meta-srv/src/procedure/region_migration/migration_end.rs b/src/meta-srv/src/procedure/region_migration/migration_end.rs index 0aebbd719c..be781f9f75 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_end.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_end.rs @@ -14,7 +14,7 @@ use std::any::Any; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Status}; use serde::{Deserialize, Serialize}; use crate::error::Result; @@ -26,7 +26,11 @@ pub struct RegionMigrationEnd; #[async_trait::async_trait] #[typetag::serde] impl State for RegionMigrationEnd { - async fn next(&mut self, _: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + _: &mut Context, + _: &ProcedureContext, + ) -> Result<(Box, Status)> { Ok((Box::new(RegionMigrationEnd), Status::done())) } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 4c097631d3..0cf2e79061 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -16,7 +16,7 @@ use std::any::Any; use common_meta::peer::Peer; use common_meta::rpc::router::RegionRoute; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Status}; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; @@ -48,7 +48,11 @@ impl State for RegionMigrationStart { /// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state. /// /// Otherwise go to the [OpenCandidateRegion] state. - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { let region_id = ctx.persistent_ctx.region_id; let region_route = self.retrieve_region_route(ctx, region_id).await?; let to_peer = &ctx.persistent_ctx.to_peer; @@ -172,7 +176,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; @@ -248,8 +252,8 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; - - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let update_metadata = next.as_any().downcast_ref::().unwrap(); @@ -279,8 +283,8 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; - - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } @@ -305,8 +309,8 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; - - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let _ = next.as_any().downcast_ref::().unwrap(); } @@ -330,8 +334,8 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; - - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let _ = next .as_any() diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index be466467b4..92be086978 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -20,7 +20,7 @@ use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply}; use common_meta::key::datanode_table::RegionInfo; use common_meta::RegionIdent; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -41,7 +41,11 @@ pub struct OpenCandidateRegion; #[async_trait::async_trait] #[typetag::serde] impl State for OpenCandidateRegion { - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { let instruction = self.build_open_region_instruction(ctx).await?; let now = Instant::now(); self.open_candidate_region(ctx, instruction).await?; @@ -200,7 +204,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv}; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; use crate::procedure::test_util::{ new_close_region_reply, new_open_region_reply, send_mock_reply, @@ -434,8 +438,8 @@ mod tests { .await; send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None))); - - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let vc = ctx.volatile_ctx; assert_eq!( vc.opening_region_guard.unwrap().info(), diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 810c1cbb33..54c07dc4f3 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -262,7 +262,8 @@ impl ProcedureMigrationTestSuite { } debug!("suite test: {name} invoking next"); - let result = self.state.next(&mut self.context).await; + let procedure_ctx = new_procedure_context(); + let result = self.state.next(&mut self.context, &procedure_ctx).await; match assertion { Assertion::Simple(state_assert, status_assert) => { @@ -563,3 +564,11 @@ fn test_merge_mailbox_messages() { unreachable!() } } + +/// Returns a new [ProcedureContext]. +pub fn new_procedure_context() -> ProcedureContext { + ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + } +} diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index 858669ea21..b8f074b591 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -18,7 +18,8 @@ pub(crate) mod upgrade_candidate_region; use std::any::Any; -use common_procedure::Status; +use common_meta::lock_key::TableLock; +use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::warn; use serde::{Deserialize, Serialize}; @@ -42,7 +43,14 @@ pub enum UpdateMetadata { #[async_trait::async_trait] #[typetag::serde] impl State for UpdateMetadata { - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + ctx: &mut Context, + procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { + let table_id = TableLock::Write(ctx.region_id().table_id()).into(); + let _guard = procedure_ctx.provider.acquire_lock(&table_id).await; + match self { UpdateMetadata::Downgrade => { self.downgrade_leader_region(ctx).await?; diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index d8bad44871..a83dbd4b37 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -86,7 +86,7 @@ mod tests { use crate::error::Error; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; - use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; @@ -193,7 +193,8 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let _ = next .as_any() @@ -236,7 +237,8 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let _ = next .as_any() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 9e038bebc6..fe0b5c28da 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -70,7 +70,7 @@ mod tests { use crate::error::Error; use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; - use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; use crate::region::supervisor::RegionFailureDetectorControl; @@ -230,7 +230,8 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let _ = next .as_any() diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 54910eab2b..42edb52671 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -196,7 +196,7 @@ mod tests { use crate::error::Error; use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion; - use crate::procedure::region_migration::test_util::{self, TestingEnv}; + use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv}; use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext, State}; @@ -469,7 +469,8 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let _ = next .as_any() diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 8f3741dbac..a51ee26b37 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -17,7 +17,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply}; -use common_procedure::Status; +use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::error; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -53,7 +53,11 @@ impl Default for UpgradeCandidateRegion { #[async_trait::async_trait] #[typetag::serde] impl State for UpgradeCandidateRegion { - async fn next(&mut self, ctx: &mut Context) -> Result<(Box, Status)> { + async fn next( + &mut self, + ctx: &mut Context, + _procedure_ctx: &ProcedureContext, + ) -> Result<(Box, Status)> { let now = Instant::now(); if self.upgrade_region_with_retry(ctx).await { ctx.update_upgrade_candidate_region_elapsed(now); @@ -231,7 +235,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::procedure::region_migration::test_util::TestingEnv; + use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv}; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; use crate::procedure::test_util::{ new_close_region_reply, new_upgrade_region_reply, send_mock_reply, @@ -486,7 +490,8 @@ mod tests { .unwrap(); }); - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let update_metadata = next.as_any().downcast_ref::().unwrap(); @@ -544,8 +549,8 @@ mod tests { .await .unwrap(); }); - - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let update_metadata = next.as_any().downcast_ref::().unwrap(); assert_matches!(update_metadata, UpdateMetadata::Rollback); @@ -573,8 +578,8 @@ mod tests { send_mock_reply(mailbox, rx, |id| { Ok(new_upgrade_region_reply(id, false, true, None)) }); - - let (next, _) = state.next(&mut ctx).await.unwrap(); + let procedure_ctx = new_procedure_context(); + let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let update_metadata = next.as_any().downcast_ref::().unwrap(); assert_matches!(update_metadata, UpdateMetadata::Rollback); }