feat: optimize region migration concurrency with fine-grained table lock (#6023)

* feat(procedure): add dynamic key locking mechanism

* feat: optimize region migration concurrency with fine-grained table lock

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-04-30 14:45:03 +08:00
committed by GitHub
parent b8f9915d47
commit 0bf07d7f91
20 changed files with 292 additions and 67 deletions

View File

@@ -18,11 +18,13 @@ use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use common_procedure::local::{acquire_dynamic_key_lock, DynamicKeyLockGuard};
use common_procedure::rwlock::KeyRwLock;
use common_procedure::store::poison_store::PoisonStore;
use common_procedure::test_util::InMemoryPoisonStore;
use common_procedure::{
Context, ContextProvider, Output, PoisonKey, Procedure, ProcedureId, ProcedureState,
ProcedureWithId, Result, Status,
ProcedureWithId, Result, Status, StringKey,
};
/// A Mock [ContextProvider].
@@ -30,6 +32,7 @@ use common_procedure::{
pub struct MockContextProvider {
states: HashMap<ProcedureId, ProcedureState>,
poison_manager: InMemoryPoisonStore,
dynamic_key_lock: Arc<KeyRwLock<String>>,
}
impl MockContextProvider {
@@ -38,6 +41,7 @@ impl MockContextProvider {
MockContextProvider {
states,
poison_manager: InMemoryPoisonStore::default(),
dynamic_key_lock: Arc::new(KeyRwLock::new()),
}
}
@@ -58,6 +62,10 @@ impl ContextProvider for MockContextProvider {
.try_put_poison(key.to_string(), procedure_id.to_string())
.await
}
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
}
}
/// Executes a procedure until it returns [Status::Done].

View File

@@ -20,6 +20,7 @@ pub mod error;
pub mod local;
pub mod options;
mod procedure;
pub mod rwlock;
pub mod store;
pub mod watcher;
@@ -28,8 +29,8 @@ pub mod test_util;
pub use crate::error::{Error, Result};
pub use crate::procedure::{
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, LockKey, Output, ParseIdError,
PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo, ProcedureManager,
ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
BoxedProcedure, BoxedProcedureLoader, Context, ContextProvider, ContextProviderRef, LockKey,
Output, ParseIdError, PoisonKey, PoisonKeys, Procedure, ProcedureId, ProcedureInfo,
ProcedureManager, ProcedureManagerRef, ProcedureState, ProcedureWithId, Status, StringKey,
};
pub use crate::watcher::Watcher;

View File

@@ -13,7 +13,6 @@
// limitations under the License.
mod runner;
mod rwlock;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet, VecDeque};
@@ -30,7 +29,6 @@ use snafu::{ensure, OptionExt, ResultExt};
use tokio::sync::watch::{self, Receiver, Sender};
use tokio::sync::{Mutex as TokioMutex, Notify};
use self::rwlock::KeyRwLock;
use crate::error::{
self, DuplicateProcedureSnafu, Error, LoaderConflictSnafu, ManagerNotStartSnafu,
PoisonKeyNotDefinedSnafu, ProcedureNotFoundSnafu, Result, StartRemoveOutdatedMetaTaskSnafu,
@@ -38,11 +36,12 @@ use crate::error::{
};
use crate::local::runner::Runner;
use crate::procedure::{BoxedProcedureLoader, InitProcedureState, PoisonKeys, ProcedureInfo};
use crate::rwlock::{KeyRwLock, OwnedKeyRwLockGuard};
use crate::store::poison_store::PoisonStoreRef;
use crate::store::{ProcedureMessage, ProcedureMessages, ProcedureStore, StateStoreRef};
use crate::{
BoxedProcedure, ContextProvider, LockKey, PoisonKey, ProcedureId, ProcedureManager,
ProcedureState, ProcedureWithId, Watcher,
ProcedureState, ProcedureWithId, StringKey, Watcher,
};
/// The expired time of a procedure's metadata.
@@ -157,12 +156,80 @@ struct LoadedProcedure {
step: u32,
}
/// The dynamic lock for procedure execution.
///
/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
/// during execution. They are only held when the procedure specifically needs these keys
/// and are released as soon as the procedure no longer needs them.
/// This allows for more fine-grained concurrency control during procedure execution.
pub(crate) type DynamicKeyLock = Arc<KeyRwLock<String>>;
/// Acquires a dynamic key lock for the given key.
///
/// This function takes a reference to the dynamic key lock and a pointer to the key.
/// It then matches the key type and acquires the appropriate lock.
pub async fn acquire_dynamic_key_lock(
lock: &DynamicKeyLock,
key: &StringKey,
) -> DynamicKeyLockGuard {
match key {
StringKey::Share(key) => {
let guard = lock.read(key.to_string()).await;
DynamicKeyLockGuard {
guard: Some(OwnedKeyRwLockGuard::from(guard)),
key: key.to_string(),
lock: lock.clone(),
}
}
StringKey::Exclusive(key) => {
let guard = lock.write(key.to_string()).await;
DynamicKeyLockGuard {
guard: Some(OwnedKeyRwLockGuard::from(guard)),
key: key.to_string(),
lock: lock.clone(),
}
}
}
}
/// A guard for the dynamic key lock.
///
/// This guard is used to release the lock when the procedure no longer needs it.
/// It also ensures that the lock is cleaned up when the guard is dropped.
pub struct DynamicKeyLockGuard {
guard: Option<OwnedKeyRwLockGuard>,
key: String,
lock: DynamicKeyLock,
}
impl Drop for DynamicKeyLockGuard {
fn drop(&mut self) {
if let Some(guard) = self.guard.take() {
drop(guard);
}
self.lock.clean_keys(&[self.key.to_string()]);
}
}
/// Shared context of the manager.
pub(crate) struct ManagerContext {
/// Procedure loaders. The key is the type name of the procedure which the loader returns.
loaders: Mutex<HashMap<String, BoxedProcedureLoader>>,
/// The key lock for the procedure.
///
/// The lock keys are defined in `Procedure::lock_key()`.
/// These locks are acquired before the procedure starts and released after the procedure finishes.
/// They ensure exclusive access to resources throughout the entire procedure lifecycle.
key_lock: KeyRwLock<String>,
/// The dynamic lock for procedure execution.
///
/// Unlike the procedure-level locks, these locks are acquired dynamically by the procedure
/// during execution. They are only held when the procedure specifically needs these keys
/// and are released as soon as the procedure no longer needs them.
/// This allows for more fine-grained concurrency control during procedure execution.
dynamic_key_lock: DynamicKeyLock,
/// Procedures in the manager.
procedures: RwLock<HashMap<ProcedureId, ProcedureMetaRef>>,
/// Running procedures.
running_procedures: Mutex<HashSet<ProcedureId>>,
/// Ids and finished time of finished procedures.
finished_procedures: Mutex<VecDeque<(ProcedureId, Instant)>>,
@@ -199,6 +266,10 @@ impl ContextProvider for ManagerContext {
let procedure_id = procedure_id.to_string();
self.poison_manager.try_put_poison(key, procedure_id).await
}
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard {
acquire_dynamic_key_lock(&self.dynamic_key_lock, key).await
}
}
impl ManagerContext {
@@ -206,6 +277,7 @@ impl ManagerContext {
fn new(poison_manager: PoisonStoreRef) -> ManagerContext {
ManagerContext {
key_lock: KeyRwLock::new(),
dynamic_key_lock: Arc::new(KeyRwLock::new()),
loaders: Mutex::new(HashMap::new()),
procedures: RwLock::new(HashMap::new()),
running_procedures: Mutex::new(HashSet::new()),

View File

@@ -23,9 +23,9 @@ use snafu::ResultExt;
use tokio::time;
use crate::error::{self, ProcedurePanicSnafu, Result, RollbackTimesExceededSnafu};
use crate::local::rwlock::OwnedKeyRwLockGuard;
use crate::local::{ManagerContext, ProcedureMeta, ProcedureMetaRef};
use crate::procedure::{Output, StringKey};
use crate::rwlock::OwnedKeyRwLockGuard;
use crate::store::{ProcedureMessage, ProcedureStore};
use crate::{
BoxedProcedure, Context, Error, Procedure, ProcedureId, ProcedureState, ProcedureWithId, Status,
@@ -581,6 +581,7 @@ impl Runner {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@@ -588,13 +589,14 @@ mod tests {
use common_error::mock::MockError;
use common_error::status_code::StatusCode;
use common_test_util::temp_dir::create_temp_dir;
use futures::future::join_all;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use object_store::{EntryMode, ObjectStore};
use tokio::sync::mpsc;
use super::*;
use crate::local::test_util;
use crate::local::{test_util, DynamicKeyLockGuard};
use crate::procedure::PoisonKeys;
use crate::store::proc_path;
use crate::test_util::InMemoryPoisonStore;
@@ -666,6 +668,10 @@ mod tests {
) -> Result<()> {
unimplemented!()
}
async fn acquire_lock(&self, _key: &StringKey) -> DynamicKeyLockGuard {
unimplemented!()
}
}
Context {
@@ -1674,4 +1680,66 @@ mod tests {
// If the procedure is poisoned, the poison key shouldn't be deleted.
assert_eq!(procedure_id, ROOT_ID);
}
fn test_procedure_with_dynamic_lock(
shared_atomic_value: Arc<AtomicU64>,
id: u64,
) -> (BoxedProcedure, Arc<ProcedureMeta>) {
let exec_fn = move |ctx: Context| {
let moved_shared_atomic_value = shared_atomic_value.clone();
let moved_ctx = ctx.clone();
async move {
debug!("Acquiring write lock, id: {}", id);
let key = StringKey::Exclusive("test_lock".to_string());
let guard = moved_ctx.provider.acquire_lock(&key).await;
debug!("Acquired write lock, id: {}", id);
let millis = rand::rng().random_range(10..=50);
tokio::time::sleep(Duration::from_millis(millis)).await;
let value = moved_shared_atomic_value.load(Ordering::Relaxed);
moved_shared_atomic_value.store(value + 1, Ordering::Relaxed);
debug!("Dropping write lock, id: {}", id);
drop(guard);
Ok(Status::done())
}
.boxed()
};
let adapter = ProcedureAdapter {
data: "dynamic_lock".to_string(),
lock_key: LockKey::new_exclusive([]),
poison_keys: PoisonKeys::new([]),
exec_fn,
rollback_fn: None,
};
let meta = adapter.new_meta(ROOT_ID);
(Box::new(adapter), meta)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_execute_with_dynamic_lock() {
common_telemetry::init_default_ut_logging();
let shared_atomic_value = Arc::new(AtomicU64::new(0));
let (procedure1, meta1) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 1);
let (procedure2, meta2) = test_procedure_with_dynamic_lock(shared_atomic_value.clone(), 2);
let dir = create_temp_dir("dynamic_lock");
let object_store = test_util::new_object_store(&dir);
let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone()));
let mut runner1 = new_runner(meta1.clone(), procedure1, procedure_store.clone());
let mut runner2 = new_runner(meta2.clone(), procedure2, procedure_store.clone());
let ctx1 = context_with_provider(
meta1.id,
runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
let ctx2 = context_with_provider(
meta2.id,
// use same manager ctx as runner1
runner1.manager_ctx.clone() as Arc<dyn ContextProvider>,
);
let tasks = [runner1.execute_once(&ctx1), runner2.execute_once(&ctx2)];
join_all(tasks).await;
assert_eq!(shared_atomic_value.load(Ordering::Relaxed), 2);
}
}

View File

@@ -25,6 +25,7 @@ use snafu::{ResultExt, Snafu};
use uuid::Uuid;
use crate::error::{self, Error, Result};
use crate::local::DynamicKeyLockGuard;
use crate::watcher::Watcher;
pub type Output = Arc<dyn Any + Send + Sync>;
@@ -144,6 +145,9 @@ pub trait ContextProvider: Send + Sync {
/// This method is used to mark a resource as being operated on by a procedure.
/// If the poison key already exists with a different value, the operation will fail.
async fn try_put_poison(&self, key: &PoisonKey, procedure_id: ProcedureId) -> Result<()>;
/// Acquires a key lock for the procedure.
async fn acquire_lock(&self, key: &StringKey) -> DynamicKeyLockGuard;
}
/// Reference-counted pointer to [ContextProvider].

View File

@@ -18,8 +18,18 @@ use std::sync::{Arc, Mutex};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
/// A guard that owns a read or write lock on a key.
///
/// This enum wraps either a read or write lock guard obtained from a `KeyRwLock`.
/// The guard is automatically released when it is dropped.
pub enum OwnedKeyRwLockGuard {
/// Represents a shared read lock on a key.
/// Multiple read locks can be held simultaneously for the same key.
Read { _guard: OwnedRwLockReadGuard<()> },
/// Represents an exclusive write lock on a key.
/// Only one write lock can be held at a time for a given key,
/// and no read locks can be held simultaneously with a write lock.
Write { _guard: OwnedRwLockWriteGuard<()> },
}
@@ -36,7 +46,7 @@ impl From<OwnedRwLockWriteGuard<()>> for OwnedKeyRwLockGuard {
}
/// Locks based on a key, allowing other keys to lock independently.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct KeyRwLock<K> {
/// The inner map of locks for specific keys.
inner: Mutex<HashMap<K, Arc<RwLock<()>>>>,

View File

@@ -37,7 +37,7 @@ use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
use common_procedure::error::{
@@ -97,9 +97,6 @@ impl PersistentContext {
let lock_key = vec![
CatalogLock::Read(&self.catalog).into(),
SchemaLock::read(&self.catalog, &self.schema).into(),
// The optimistic updating of table route is not working very well,
// so we need to use the write lock here.
TableLock::Write(region_id.table_id()).into(),
RegionLock::Write(region_id).into(),
];
@@ -531,7 +528,11 @@ pub(crate) trait State: Sync + Send + Debug {
}
/// Yields the next [State] and [Status].
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)>;
async fn next(
&mut self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)>;
/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
@@ -666,14 +667,14 @@ impl Procedure for RegionMigrationProcedure {
true
}
async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;
let name = state.name();
let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
.with_label_values(&[name])
.start_timer();
let (next, status) = state.next(&mut self.context).await.map_err(|e| {
let (next, status) = state.next(&mut self.context, ctx).await.map_err(|e| {
if e.is_retryable() {
METRIC_META_REGION_MIGRATION_ERROR
.with_label_values(&[name, "retryable"])
@@ -785,7 +786,11 @@ mod tests {
#[async_trait::async_trait]
#[typetag::serde]
impl State for MockState {
async fn next(&mut self, _ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
_ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(MockState), Status::done()))
}

View File

@@ -20,7 +20,7 @@ use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::ResultExt;
@@ -40,7 +40,11 @@ pub struct CloseDowngradedRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for CloseDowngradedRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
if let Err(err) = self.close_downgraded_leader_region(ctx).await {
let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer;
let region_id = ctx.region_id();

View File

@@ -20,7 +20,7 @@ use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{
DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
};
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{error, info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -53,7 +53,11 @@ impl Default for DowngradeLeaderRegion {
#[async_trait::async_trait]
#[typetag::serde]
impl State for DowngradeLeaderRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let now = Instant::now();
// Ensures the `leader_region_lease_deadline` must exist after recovering.
ctx.volatile_ctx
@@ -270,7 +274,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
@@ -572,7 +576,8 @@ mod tests {
});
let timer = Instant::now();
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let elapsed = timer.elapsed().as_secs();
assert!(elapsed < REGION_LEASE_SECS / 2);
assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
@@ -606,7 +611,8 @@ mod tests {
send_mock_reply(mailbox, rx, |id| {
Ok(new_downgrade_region_reply(id, None, true, None))
});
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Rollback);
}

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::{info, warn};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -37,7 +37,11 @@ pub struct PreFlushRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for PreFlushRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let timer = Instant::now();
self.flush_region(ctx).await?;
ctx.update_flush_leader_region_elapsed(timer);
@@ -163,7 +167,7 @@ mod tests {
use store_api::storage::RegionId;
use super::*;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_flush_region_reply, send_mock_reply,
@@ -277,7 +281,8 @@ mod tests {
.insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Downgrade);

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
@@ -38,7 +38,11 @@ impl RegionMigrationAbort {
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationAbort {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
warn!(
"Region migration is aborted: {}, region_id: {}, from_peer: {}, to_peer: {}, {}",
self.reason,

View File

@@ -14,7 +14,7 @@
use std::any::Any;
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use crate::error::Result;
@@ -26,7 +26,11 @@ pub struct RegionMigrationEnd;
#[async_trait::async_trait]
#[typetag::serde]
impl State for RegionMigrationEnd {
async fn next(&mut self, _: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
_: &mut Context,
_: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
Ok((Box::new(RegionMigrationEnd), Status::done()))
}

View File

@@ -16,7 +16,7 @@ use std::any::Any;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Status};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -48,7 +48,11 @@ impl State for RegionMigrationStart {
/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state.
///
/// Otherwise go to the [OpenCandidateRegion] state.
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let region_id = ctx.persistent_ctx.region_id;
let region_route = self.retrieve_region_route(ctx, region_id).await?;
let to_peer = &ctx.persistent_ctx.to_peer;
@@ -172,7 +176,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
@@ -248,8 +252,8 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
@@ -279,8 +283,8 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<RegionMigrationEnd>().unwrap();
}
@@ -305,8 +309,8 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}
@@ -330,8 +334,8 @@ mod tests {
env.create_physical_table_metadata(table_info, region_routes)
.await;
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let _ = next
.as_any()

View File

@@ -20,7 +20,7 @@ use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
use common_meta::key::datanode_table::RegionInfo;
use common_meta::RegionIdent;
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
@@ -41,7 +41,11 @@ pub struct OpenCandidateRegion;
#[async_trait::async_trait]
#[typetag::serde]
impl State for OpenCandidateRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let instruction = self.build_open_region_instruction(ctx).await?;
let now = Instant::now();
self.open_candidate_region(ctx, instruction).await?;
@@ -200,7 +204,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_open_region_reply, send_mock_reply,
@@ -434,8 +438,8 @@ mod tests {
.await;
send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let vc = ctx.volatile_ctx;
assert_eq!(
vc.opening_region_guard.unwrap().info(),

View File

@@ -262,7 +262,8 @@ impl ProcedureMigrationTestSuite {
}
debug!("suite test: {name} invoking next");
let result = self.state.next(&mut self.context).await;
let procedure_ctx = new_procedure_context();
let result = self.state.next(&mut self.context, &procedure_ctx).await;
match assertion {
Assertion::Simple(state_assert, status_assert) => {
@@ -563,3 +564,11 @@ fn test_merge_mailbox_messages() {
unreachable!()
}
}
/// Returns a new [ProcedureContext].
pub fn new_procedure_context() -> ProcedureContext {
ProcedureContext {
procedure_id: ProcedureId::random(),
provider: Arc::new(MockContextProvider::default()),
}
}

View File

@@ -18,7 +18,8 @@ pub(crate) mod upgrade_candidate_region;
use std::any::Any;
use common_procedure::Status;
use common_meta::lock_key::TableLock;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
@@ -42,7 +43,14 @@ pub enum UpdateMetadata {
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpdateMetadata {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
ctx: &mut Context,
procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let table_id = TableLock::Write(ctx.region_id().table_id()).into();
let _guard = procedure_ctx.provider.acquire_lock(&table_id).await;
match self {
UpdateMetadata::Downgrade => {
self.downgrade_leader_region(ctx).await?;

View File

@@ -86,7 +86,7 @@ mod tests {
use crate::error::Error;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
@@ -193,7 +193,8 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let _ = next
.as_any()
@@ -236,7 +237,8 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let _ = next
.as_any()

View File

@@ -70,7 +70,7 @@ mod tests {
use crate::error::Error;
use crate::procedure::region_migration::migration_abort::RegionMigrationAbort;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
use crate::region::supervisor::RegionFailureDetectorControl;
@@ -230,7 +230,8 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let _ = next
.as_any()

View File

@@ -196,7 +196,7 @@ mod tests {
use crate::error::Error;
use crate::procedure::region_migration::close_downgraded_region::CloseDowngradedRegion;
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
@@ -469,7 +469,8 @@ mod tests {
let table_metadata_manager = env.table_metadata_manager();
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let _ = next
.as_any()

View File

@@ -17,7 +17,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
use common_procedure::Status;
use common_procedure::{Context as ProcedureContext, Status};
use common_telemetry::error;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
@@ -53,7 +53,11 @@ impl Default for UpgradeCandidateRegion {
#[async_trait::async_trait]
#[typetag::serde]
impl State for UpgradeCandidateRegion {
async fn next(&mut self, ctx: &mut Context) -> Result<(Box<dyn State>, Status)> {
async fn next(
&mut self,
ctx: &mut Context,
_procedure_ctx: &ProcedureContext,
) -> Result<(Box<dyn State>, Status)> {
let now = Instant::now();
if self.upgrade_region_with_retry(ctx).await {
ctx.update_upgrade_candidate_region_elapsed(now);
@@ -231,7 +235,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
use crate::procedure::region_migration::{ContextFactory, PersistentContext};
use crate::procedure::test_util::{
new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
@@ -486,7 +490,8 @@ mod tests {
.unwrap();
});
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
@@ -544,8 +549,8 @@ mod tests {
.await
.unwrap();
});
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Rollback);
@@ -573,8 +578,8 @@ mod tests {
send_mock_reply(mailbox, rx, |id| {
Ok(new_upgrade_region_reply(id, false, true, None))
});
let (next, _) = state.next(&mut ctx).await.unwrap();
let procedure_ctx = new_procedure_context();
let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
assert_matches!(update_metadata, UpdateMetadata::Rollback);
}