From d5f52013ecb97bd9eb9be863b8274540c3191f40 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 14 Nov 2025 16:15:18 +0800 Subject: [PATCH] feat: introduce batch region migration (#7176) * feat: introduce batch region migration Signed-off-by: WenyXu * fix: try fix unit tests Signed-off-by: WenyXu * fix clippy Signed-off-by: WenyXu * fix: fix get table route Signed-off-by: WenyXu * fix unit tests Signed-off-by: WenyXu * chore: avoid cloning vec Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * fix: fix tests Signed-off-by: WenyXu * chore: apply suggestions Signed-off-by: WenyXu * chore: add suggestions Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- src/common/meta/src/key/datanode_table.rs | 19 + src/common/meta/src/key/table_route.rs | 33 +- src/common/meta/src/key/topic_region.rs | 2 +- .../src/events/region_migration_event.rs | 8 +- .../src/procedure/region_migration.rs | 577 ++++++++++-------- .../close_downgraded_region.rs | 41 +- .../downgrade_leader_region.rs | 204 ++++--- .../region_migration/flush_leader_region.rs | 72 ++- .../src/procedure/region_migration/manager.rs | 4 +- .../region_migration/migration_abort.rs | 4 +- .../region_migration/migration_start.rs | 209 +++---- .../region_migration/open_candidate_region.rs | 111 ++-- .../procedure/region_migration/test_util.rs | 59 +- .../region_migration/update_metadata.rs | 13 +- .../downgrade_leader_region.rs | 79 ++- .../rollback_downgraded_region.rs | 63 +- .../upgrade_candidate_region.rs | 362 ++++++----- .../upgrade_candidate_region.rs | 250 ++++---- 18 files changed, 1213 insertions(+), 897 deletions(-) diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 68105a478a..8aca2fcaf7 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -164,6 +164,25 @@ impl DatanodeTableManager { .transpose() } + pub async fn batch_get( + &self, + keys: &[DatanodeTableKey], + ) -> Result> { + let req = BatchGetRequest::default().with_keys(keys.iter().map(|k| k.to_bytes()).collect()); + let resp = self.kv_backend.batch_get(req).await?; + let values = resp + .kvs + .into_iter() + .map(|kv| { + Ok(( + DatanodeTableKey::from_bytes(&kv.key)?, + DatanodeTableValue::try_from_raw_value(&kv.value)?, + )) + }) + .collect::>>()?; + Ok(values) + } + pub fn tables( &self, datanode_id: DatanodeId, diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 5f6782f002..fe1f11bf15 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -661,13 +661,32 @@ impl TableRouteStorage { /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`. pub async fn batch_get(&self, table_ids: &[TableId]) -> Result>> { - let mut table_routes = self.batch_get_inner(table_ids).await?; - self.remap_routes_addresses(&mut table_routes).await?; + let raw_table_routes = self.batch_get_inner(table_ids).await?; - Ok(table_routes) + Ok(raw_table_routes + .into_iter() + .map(|v| v.map(|x| x.inner)) + .collect()) } - async fn batch_get_inner(&self, table_ids: &[TableId]) -> Result>> { + /// Returns batch of [`TableRouteValue`] wrapped with [`DeserializedValueWithBytes`]. + /// + /// The return value is a vector of [`Option>`]. + /// Note: This method remaps the addresses of the table routes, but does not update their raw byte representations. + pub async fn batch_get_with_raw_bytes( + &self, + table_ids: &[TableId], + ) -> Result>>> { + let mut raw_table_routes = self.batch_get_inner(table_ids).await?; + self.remap_routes_addresses(&mut raw_table_routes).await?; + + Ok(raw_table_routes) + } + + async fn batch_get_inner( + &self, + table_ids: &[TableId], + ) -> Result>>> { let keys = table_ids .iter() .map(|id| TableRouteKey::new(*id).to_bytes()) @@ -685,7 +704,7 @@ impl TableRouteStorage { keys.into_iter() .map(|key| { if let Some(value) = kvs.get(&key) { - Ok(Some(TableRouteValue::try_from_raw_value(value)?)) + Ok(Some(DeserializedValueWithBytes::from_inner_slice(value)?)) } else { Ok(None) } @@ -695,14 +714,14 @@ impl TableRouteStorage { async fn remap_routes_addresses( &self, - table_routes: &mut [Option], + table_routes: &mut [Option>], ) -> Result<()> { let keys = table_routes .iter() .flat_map(|table_route| { table_route .as_ref() - .map(extract_address_keys) + .map(|x| extract_address_keys(&x.inner)) .unwrap_or_default() }) .collect::>() diff --git a/src/common/meta/src/key/topic_region.rs b/src/common/meta/src/key/topic_region.rs index 844a46735f..c34229cf9e 100644 --- a/src/common/meta/src/key/topic_region.rs +++ b/src/common/meta/src/key/topic_region.rs @@ -33,7 +33,7 @@ use crate::rpc::store::{ // The TopicRegionKey is a key for the topic-region mapping in the kvbackend. // The layout of the key is `__topic_region/{topic_name}/{region_id}`. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct TopicRegionKey<'a> { pub region_id: RegionId, pub topic: &'a str, diff --git a/src/meta-srv/src/events/region_migration_event.rs b/src/meta-srv/src/events/region_migration_event.rs index 3fc8500599..e5b81d5585 100644 --- a/src/meta-srv/src/events/region_migration_event.rs +++ b/src/meta-srv/src/events/region_migration_event.rs @@ -62,10 +62,12 @@ pub(crate) struct RegionMigrationEvent { impl RegionMigrationEvent { pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self { + // FIXME(weny): handle multiple region ids. + let region_id = ctx.region_ids[0]; Self { - region_id: ctx.region_id, - table_id: ctx.region_id.table_id(), - region_number: ctx.region_id.region_number(), + region_id, + table_id: region_id.table_id(), + region_number: region_id.region_number(), trigger_reason: ctx.trigger_reason, src_node_id: ctx.from_peer.id, src_peer_addr: ctx.from_peer.addr.clone(), diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 935e59ba33..5f710adcd3 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -26,6 +26,7 @@ pub(crate) mod update_metadata; pub(crate) mod upgrade_candidate_region; use std::any::Any; +use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Display}; use std::sync::Arc; use std::time::Duration; @@ -36,7 +37,6 @@ use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::RegionFailureDetectorControllerRef; use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; -use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey}; use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; @@ -56,9 +56,9 @@ pub use manager::{ RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker, RegionMigrationTriggerReason, }; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, TableId}; use tokio::time::Instant; use self::migration_start::RegionMigrationStart; @@ -73,6 +73,25 @@ use crate::service::mailbox::MailboxRef; /// The default timeout for region migration. pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120); +#[derive(Debug, Deserialize)] +#[serde(untagged)] +enum SingleOrMultiple { + Single(T), + Multiple(Vec), +} + +fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, + T: Deserialize<'de>, +{ + let helper = SingleOrMultiple::::deserialize(deserializer)?; + Ok(match helper { + SingleOrMultiple::Single(x) => vec![x], + SingleOrMultiple::Multiple(xs) => xs, + }) +} + /// It's shared in each step and available even after recovering. /// /// It will only be updated/stored after the Red node has succeeded. @@ -89,7 +108,8 @@ pub struct PersistentContext { /// The [Peer] of migration destination. pub(crate) to_peer: Peer, /// The [RegionId] of migration region. - pub(crate) region_id: RegionId, + #[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")] + pub(crate) region_ids: Vec, /// The timeout for downgrading leader region and upgrading candidate region operations. #[serde(with = "humantime_serde", default = "default_timeout")] pub(crate) timeout: Duration, @@ -104,14 +124,42 @@ fn default_timeout() -> Duration { impl PersistentContext { pub fn lock_key(&self) -> Vec { - let region_id = self.region_id; - let lock_key = vec![ - CatalogLock::Read(&self.catalog).into(), - SchemaLock::read(&self.catalog, &self.schema).into(), - RegionLock::Write(region_id).into(), - ]; + let mut lock_keys = Vec::with_capacity(self.region_ids.len() + 2); + lock_keys.push(CatalogLock::Read(&self.catalog).into()); + lock_keys.push(SchemaLock::read(&self.catalog, &self.schema).into()); + // Sort the region ids to ensure the same order of region ids. + let mut region_ids = self.region_ids.clone(); + region_ids.sort_unstable(); + for region_id in region_ids { + lock_keys.push(RegionLock::Write(region_id).into()); + } + lock_keys + } - lock_key + /// Returns the table ids of the regions. + /// + /// The return value is a set of table ids. + pub fn region_table_ids(&self) -> Vec { + self.region_ids + .iter() + .map(|region_id| region_id.table_id()) + .collect::>() + .into_iter() + .collect() + } + + /// Returns the table regions map. + /// + /// The key is the table id, the value is the region ids of the table. + pub fn table_regions(&self) -> HashMap> { + let mut table_regions = HashMap::new(); + for region_id in &self.region_ids { + table_regions + .entry(region_id.table_id()) + .or_insert_with(Vec::new) + .push(*region_id); + } + table_regions } } @@ -227,23 +275,18 @@ pub struct VolatileContext { /// `opening_region_guard` will be set after the /// [OpenCandidateRegion](crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion) step. /// - /// `opening_region_guard` should be consumed after + /// `opening_region_guards` should be consumed after /// the corresponding [RegionRoute](common_meta::rpc::router::RegionRoute) of the opening region /// was written into [TableRouteValue](common_meta::key::table_route::TableRouteValue). - opening_region_guard: Option, - /// `datanode_table` is stored via previous steps for future use. - from_peer_datanode_table: Option, - /// `table_info` is stored via previous steps for future use. - /// - /// `table_info` should remain unchanged during the procedure; - /// no other DDL procedure executed concurrently for the current table. - table_info: Option>, + opening_region_guards: Vec, /// The deadline of leader region lease. leader_region_lease_deadline: Option, - /// The last_entry_id of leader region. - leader_region_last_entry_id: Option, - /// The last_entry_id of leader metadata region (Only used for metric engine). - leader_region_metadata_last_entry_id: Option, + /// The datanode table values. + from_peer_datanode_table_values: Option>, + /// The last_entry_ids of leader regions. + leader_region_last_entry_ids: HashMap, + /// The last_entry_ids of leader metadata regions (Only used for metric engine). + leader_region_metadata_last_entry_ids: HashMap, /// Metrics of region migration. metrics: Metrics, } @@ -262,13 +305,15 @@ impl VolatileContext { } /// Sets the `leader_region_last_entry_id`. - pub fn set_last_entry_id(&mut self, last_entry_id: u64) { - self.leader_region_last_entry_id = Some(last_entry_id) + pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) { + self.leader_region_last_entry_ids + .insert(region_id, last_entry_id); } /// Sets the `leader_region_metadata_last_entry_id`. - pub fn set_metadata_last_entry_id(&mut self, last_entry_id: u64) { - self.leader_region_metadata_last_entry_id = Some(last_entry_id); + pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) { + self.leader_region_metadata_last_entry_ids + .insert(region_id, last_entry_id); } } @@ -317,7 +362,7 @@ impl DefaultContextFactory { impl ContextFactory for DefaultContextFactory { fn new_context(self, persistent_ctx: PersistentContext) -> Context { Context { - persistent_ctx: Arc::new(persistent_ctx), + persistent_ctx, volatile_ctx: self.volatile_ctx, in_memory: self.in_memory_key, table_metadata_manager: self.table_metadata_manager, @@ -332,7 +377,7 @@ impl ContextFactory for DefaultContextFactory { /// The context of procedure execution. pub struct Context { - persistent_ctx: Arc, + persistent_ctx: PersistentContext, volatile_ctx: VolatileContext, in_memory: KvBackendRef, table_metadata_manager: TableMetadataManagerRef, @@ -391,6 +436,47 @@ impl Context { &self.server_addr } + /// Returns the table ids of the regions. + pub fn region_table_ids(&self) -> Vec { + self.persistent_ctx + .region_ids + .iter() + .map(|region_id| region_id.table_id()) + .collect::>() + .into_iter() + .collect() + } + + /// Returns the `table_routes` of [VolatileContext] if any. + /// Otherwise, returns the value retrieved from remote. + /// + /// Retry: + /// - Failed to retrieve the metadata of table. + pub async fn get_table_route_values( + &self, + ) -> Result>> { + let table_ids = self.persistent_ctx.region_table_ids(); + let table_routes = self + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .batch_get_with_raw_bytes(&table_ids) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get table routes: {table_ids:?}"), + })?; + let table_routes = table_ids + .into_iter() + .zip(table_routes) + .filter_map(|(table_id, table_route)| { + table_route.map(|table_route| (table_id, table_route)) + }) + .collect::>(); + Ok(table_routes) + } + /// Returns the `table_route` of [VolatileContext] if any. /// Otherwise, returns the value retrieved from remote. /// @@ -398,9 +484,9 @@ impl Context { /// - Failed to retrieve the metadata of table. pub async fn get_table_route_value( &self, + table_id: TableId, ) -> Result> { - let table_id = self.persistent_ctx.region_id.table_id(); - let table_route = self + let table_route_value = self .table_metadata_manager .table_route_manager() .table_route_storage() @@ -409,11 +495,76 @@ impl Context { .context(error::TableMetadataManagerSnafu) .map_err(BoxedError::new) .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get TableRoute: {table_id}"), + reason: format!("Failed to get table routes: {table_id:}"), })? .context(error::TableRouteNotFoundSnafu { table_id })?; + Ok(table_route_value) + } - Ok(table_route) + /// Returns the `from_peer_datanode_table_values` of [VolatileContext] if any. + /// Otherwise, returns the value retrieved from remote. + /// + /// Retry: + /// - Failed to retrieve the metadata of datanode table. + pub async fn get_from_peer_datanode_table_values( + &mut self, + ) -> Result<&HashMap> { + let from_peer_datanode_table_values = + &mut self.volatile_ctx.from_peer_datanode_table_values; + if from_peer_datanode_table_values.is_none() { + let table_ids = self.persistent_ctx.region_table_ids(); + let datanode_table_keys = table_ids + .iter() + .map(|table_id| DatanodeTableKey { + datanode_id: self.persistent_ctx.from_peer.id, + table_id: *table_id, + }) + .collect::>(); + let datanode_table_values = self + .table_metadata_manager + .datanode_table_manager() + .batch_get(&datanode_table_keys) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get DatanodeTable: {table_ids:?}"), + })? + .into_iter() + .map(|(k, v)| (k.table_id, v)) + .collect(); + *from_peer_datanode_table_values = Some(datanode_table_values); + } + Ok(from_peer_datanode_table_values.as_ref().unwrap()) + } + + /// Returns the `from_peer_datanode_table_value` of [VolatileContext] if any. + /// Otherwise, returns the value retrieved from remote. + /// + /// Retry: + /// - Failed to retrieve the metadata of datanode table. + pub async fn get_from_peer_datanode_table_value( + &self, + table_id: TableId, + ) -> Result { + let datanode_table_value = self + .table_metadata_manager + .datanode_table_manager() + .get(&DatanodeTableKey { + datanode_id: self.persistent_ctx.from_peer.id, + table_id, + }) + .await + .context(error::TableMetadataManagerSnafu) + .map_err(BoxedError::new) + .with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to get DatanodeTable: {table_id}"), + })? + .context(error::DatanodeTableNotFoundSnafu { + table_id, + datanode_id: self.persistent_ctx.from_peer.id, + })?; + Ok(datanode_table_value) } /// Notifies the RegionSupervisor to register failure detectors of failed region. @@ -422,11 +573,18 @@ impl Context { /// Now, we need to register the failure detector for the failed region again. pub async fn register_failure_detectors(&self) { let datanode_id = self.persistent_ctx.from_peer.id; - let region_id = self.persistent_ctx.region_id; - + let region_ids = &self.persistent_ctx.region_ids; + let detecting_regions = region_ids + .iter() + .map(|region_id| (datanode_id, *region_id)) + .collect::>(); self.region_failure_detector_controller - .register_failure_detectors(vec![(datanode_id, region_id)]) + .register_failure_detectors(detecting_regions) .await; + info!( + "Registered failure detectors after migration failures for datanode {}, regions {:?}", + datanode_id, region_ids + ); } /// Notifies the RegionSupervisor to deregister failure detectors. @@ -435,10 +593,14 @@ impl Context { /// We need to deregister the failure detectors for the original region if the procedure is finished. pub async fn deregister_failure_detectors(&self) { let datanode_id = self.persistent_ctx.from_peer.id; - let region_id = self.persistent_ctx.region_id; + let region_ids = &self.persistent_ctx.region_ids; + let detecting_regions = region_ids + .iter() + .map(|region_id| (datanode_id, *region_id)) + .collect::>(); self.region_failure_detector_controller - .deregister_failure_detectors(vec![(datanode_id, region_id)]) + .deregister_failure_detectors(detecting_regions) .await; } @@ -448,112 +610,52 @@ impl Context { /// so we need to deregister the failure detectors for the candidate region if the procedure is aborted. pub async fn deregister_failure_detectors_for_candidate_region(&self) { let to_peer_id = self.persistent_ctx.to_peer.id; - let region_id = self.persistent_ctx.region_id; + let region_ids = &self.persistent_ctx.region_ids; + let detecting_regions = region_ids + .iter() + .map(|region_id| (to_peer_id, *region_id)) + .collect::>(); self.region_failure_detector_controller - .deregister_failure_detectors(vec![(to_peer_id, region_id)]) + .deregister_failure_detectors(detecting_regions) .await; } - /// Returns the `table_info` of [VolatileContext] if any. - /// Otherwise, returns the value retrieved from remote. - /// - /// Retry: - /// - Failed to retrieve the metadata of table. - pub async fn get_table_info_value( - &mut self, - ) -> Result<&DeserializedValueWithBytes> { - let table_info_value = &mut self.volatile_ctx.table_info; - - if table_info_value.is_none() { - let table_id = self.persistent_ctx.region_id.table_id(); - let table_info = self - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get TableInfo: {table_id}"), - })? - .context(error::TableInfoNotFoundSnafu { table_id })?; - - *table_info_value = Some(table_info); - } - - Ok(table_info_value.as_ref().unwrap()) - } - - /// Returns the `table_info` of [VolatileContext] if any. - /// Otherwise, returns the value retrieved from remote. - /// - /// Retry: - /// - Failed to retrieve the metadata of datanode. - pub async fn get_from_peer_datanode_table_value(&mut self) -> Result<&DatanodeTableValue> { - let datanode_value = &mut self.volatile_ctx.from_peer_datanode_table; - - if datanode_value.is_none() { - let table_id = self.persistent_ctx.region_id.table_id(); - let datanode_id = self.persistent_ctx.from_peer.id; - - let datanode_table = self - .table_metadata_manager - .datanode_table_manager() - .get(&DatanodeTableKey { - datanode_id, - table_id, - }) - .await - .context(error::TableMetadataManagerSnafu) - .map_err(BoxedError::new) - .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"), - })? - .context(error::DatanodeTableNotFoundSnafu { - table_id, - datanode_id, - })?; - - *datanode_value = Some(datanode_table); - } - - Ok(datanode_value.as_ref().unwrap()) - } - - /// Fetches the replay checkpoint for the given topic. - pub async fn fetch_replay_checkpoint(&self, topic: &str) -> Result> { - let region_id = self.region_id(); - let topic_region_key = TopicRegionKey::new(region_id, topic); - let value = self + /// Fetches the replay checkpoints for the given topic region keys. + pub async fn get_replay_checkpoints( + &self, + topic_region_keys: Vec>, + ) -> Result> { + let topic_region_values = self .table_metadata_manager .topic_region_manager() - .get(topic_region_key) + .batch_get(topic_region_keys) .await .context(error::TableMetadataManagerSnafu)?; - Ok(value.and_then(|value| value.checkpoint)) - } + let replay_checkpoints = topic_region_values + .into_iter() + .flat_map(|(key, value)| value.checkpoint.map(|value| (key, value))) + .collect::>(); - /// Returns the [RegionId]. - pub fn region_id(&self) -> RegionId { - self.persistent_ctx.region_id + Ok(replay_checkpoints) } /// Broadcasts the invalidate table cache message. pub async fn invalidate_table_cache(&self) -> Result<()> { - let table_id = self.region_id().table_id(); + let table_ids = self.region_table_ids(); + let mut cache_idents = Vec::with_capacity(table_ids.len()); + for table_id in &table_ids { + cache_idents.push(CacheIdent::TableId(*table_id)); + } // ignore the result let ctx = common_meta::cache_invalidator::Context::default(); - let _ = self - .cache_invalidator - .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) - .await; + let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await; Ok(()) } /// Returns the [PersistentContext] of the procedure. - pub fn persistent_ctx(&self) -> Arc { + pub fn persistent_ctx(&self) -> PersistentContext { self.persistent_ctx.clone() } } @@ -595,7 +697,7 @@ pub struct RegionMigrationData<'a> { pub(crate) struct RegionMigrationProcedure { state: Box, context: Context, - _guard: Option, + _guards: Vec, } impl RegionMigrationProcedure { @@ -604,22 +706,22 @@ impl RegionMigrationProcedure { pub fn new( persistent_context: PersistentContext, context_factory: impl ContextFactory, - guard: Option, + guards: Vec, ) -> Self { let state = Box::new(RegionMigrationStart {}); - Self::new_inner(state, persistent_context, context_factory, guard) + Self::new_inner(state, persistent_context, context_factory, guards) } fn new_inner( state: Box, persistent_context: PersistentContext, context_factory: impl ContextFactory, - guard: Option, + guards: Vec, ) -> Self { Self { state, context: context_factory.new_context(persistent_context), - _guard: guard, + _guards: guards, } } @@ -632,20 +734,26 @@ impl RegionMigrationProcedure { persistent_ctx, state, } = serde_json::from_str(json).context(FromJsonSnafu)?; + let guards = persistent_ctx + .region_ids + .iter() + .flat_map(|region_id| { + tracker.insert_running_procedure(&RegionMigrationProcedureTask { + region_id: *region_id, + from_peer: persistent_ctx.from_peer.clone(), + to_peer: persistent_ctx.to_peer.clone(), + timeout: persistent_ctx.timeout, + trigger_reason: persistent_ctx.trigger_reason, + }) + }) + .collect::>(); - let guard = tracker.insert_running_procedure(&RegionMigrationProcedureTask { - region_id: persistent_ctx.region_id, - from_peer: persistent_ctx.from_peer.clone(), - to_peer: persistent_ctx.to_peer.clone(), - timeout: persistent_ctx.timeout, - trigger_reason: persistent_ctx.trigger_reason, - }); let context = context_factory.new_context(persistent_ctx); Ok(Self { state, context, - _guard: guard, + _guards: guards, }) } @@ -653,27 +761,25 @@ impl RegionMigrationProcedure { let _timer = METRIC_META_REGION_MIGRATION_EXECUTE .with_label_values(&["rollback"]) .start_timer(); - - let table_id = self.context.region_id().table_id(); - let region_id = self.context.region_id(); - let table_metadata_manager = self.context.table_metadata_manager.clone(); - let table_route = self.context.get_table_route_value().await?; - - // Safety: It must be a physical table route. - let downgraded = table_route - .region_routes() - .unwrap() - .iter() - .filter(|route| route.region.id == region_id) - .any(|route| route.is_leader_downgrading()); - - if downgraded { - let table_lock = TableLock::Write(region_id.table_id()).into(); + let ctx = &self.context; + let table_regions = ctx.persistent_ctx.table_regions(); + for (table_id, regions) in table_regions { + let table_lock = TableLock::Write(table_id).into(); let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await; - info!("Rollbacking downgraded region leader table route, region: {region_id}"); - table_metadata_manager + let table_route = ctx.get_table_route_value(table_id).await?; + let region_routes = table_route.region_routes().unwrap(); + let downgraded = region_routes + .iter() + .filter(|route| regions.contains(&route.region.id)) + .any(|route| route.is_leader_downgrading()); + if downgraded { + info!( + "Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}" + ); + let table_metadata_manager = &ctx.table_metadata_manager; + table_metadata_manager .update_leader_region_status(table_id, &table_route, |route| { - if route.region.id == region_id { + if regions.contains(&route.region.id) { Some(None) } else { None @@ -683,13 +789,13 @@ impl RegionMigrationProcedure { .context(error::TableMetadataManagerSnafu) .map_err(BoxedError::new) .with_context(|_| error::RetryLaterWithSourceSnafu { - reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"), + reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"), })?; - self.context - .deregister_failure_detectors_for_candidate_region() - .await; + } } - + self.context + .deregister_failure_detectors_for_candidate_region() + .await; self.context.register_failure_detectors().await; Ok(()) @@ -732,14 +838,14 @@ impl Procedure for RegionMigrationProcedure { Err(ProcedureError::retry_later(e)) } else { // Consumes the opening region guard before deregistering the failure detectors. - self.context.volatile_ctx.opening_region_guard.take(); + self.context.volatile_ctx.opening_region_guards.clear(); self.context .deregister_failure_detectors_for_candidate_region() .await; error!( e; - "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}", - self.context.region_id(), + "Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}", + self.context.persistent_ctx.region_ids, self.context.persistent_ctx.from_peer, self.context.persistent_ctx.to_peer, self.context.volatile_ctx.metrics, @@ -766,7 +872,7 @@ impl Procedure for RegionMigrationProcedure { } fn user_metadata(&self) -> Option { - Some(UserMetadata::new(self.context.persistent_ctx())) + Some(UserMetadata::new(Arc::new(self.context.persistent_ctx()))) } } @@ -780,7 +886,6 @@ mod tests { use common_meta::key::test_utils::new_test_table_info; use common_meta::rpc::router::{Region, RegionRoute}; - use super::update_metadata::UpdateMetadata; use super::*; use crate::handler::HeartbeatMailbox; use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; @@ -803,7 +908,7 @@ mod tests { let env = TestingEnv::new(); let context = env.context_factory(); - let procedure = RegionMigrationProcedure::new(persistent_context, context, None); + let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]); let key = procedure.lock_key(); let keys = key.keys_to_lock().cloned().collect::>(); @@ -820,10 +925,10 @@ mod tests { let env = TestingEnv::new(); let context = env.context_factory(); - let procedure = RegionMigrationProcedure::new(persistent_context, context, None); + let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]); let serialized = procedure.dump().unwrap(); - let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } @@ -864,7 +969,7 @@ mod tests { let persistent_context = new_persistent_context(); let context_factory = env.context_factory(); let state = Box::::default(); - RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, None) + RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![]) } let ctx = TestingEnv::procedure_context(); @@ -887,7 +992,9 @@ mod tests { let mut procedure = RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone()) .unwrap(); - assert!(tracker.contains(procedure.context.persistent_ctx.region_id)); + for region_id in &procedure.context.persistent_ctx.region_ids { + assert!(tracker.contains(*region_id)); + } for _ in 1..3 { status = Some(procedure.execute(&ctx).await.unwrap()); @@ -927,9 +1034,34 @@ mod tests { vec![ // MigrationStart Step::next( - "Should be the update metadata for downgrading", + "Should be the open candidate region", None, - Assertion::simple(assert_update_metadata_downgrade, assert_need_persist), + Assertion::simple(assert_open_candidate_region, assert_need_persist), + ), + // OpenCandidateRegion + Step::next( + "Should be the flush leader region", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| Ok(new_open_region_reply(id, true, None))), + )), + Assertion::simple(assert_flush_leader_region, assert_no_persist), + ), + // Flush Leader Region + Step::next( + "Should be the flush leader region", + Some(mock_datanode_reply( + from_peer_id, + Arc::new(move |id| { + Ok(new_flush_region_reply_for_region( + id, + RegionId::new(1024, 1), + true, + None, + )) + }), + )), + Assertion::simple(assert_update_metadata_downgrade, assert_no_persist), ), // UpdateMetadata::Downgrade Step::next( @@ -988,7 +1120,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let from_peer = persistent_context.from_peer.clone(); let to_peer = persistent_context.to_peer.clone(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), @@ -1015,61 +1147,6 @@ mod tests { runner.suite.verify_table_metadata().await; } - #[tokio::test] - async fn test_procedure_flow_idempotent() { - common_telemetry::init_default_ut_logging(); - - let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); - let state = Box::new(RegionMigrationStart); - - // The table metadata. - let from_peer_id = persistent_context.from_peer.id; - let to_peer_id = persistent_context.to_peer.id; - let from_peer = persistent_context.from_peer.clone(); - let to_peer = persistent_context.to_peer.clone(); - let region_id = persistent_context.region_id; - let table_info = new_test_table_info(1024, vec![1]).into(); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(from_peer), - follower_peers: vec![to_peer], - ..Default::default() - }]; - - let suite = ProcedureMigrationTestSuite::new(persistent_context, state); - suite.init_table_metadata(table_info, region_routes).await; - - let steps = procedure_flow_steps(from_peer_id, to_peer_id); - let setup_to_latest_persisted_state = Step::setup( - "Sets state to UpdateMetadata::Downgrade", - merge_before_test_fn(vec![ - setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))), - Arc::new(reset_volatile_ctx), - ]), - ); - - let steps = [ - steps.clone(), - vec![setup_to_latest_persisted_state.clone()], - steps.clone()[1..].to_vec(), - vec![setup_to_latest_persisted_state], - steps.clone()[1..].to_vec(), - ] - .concat(); - let timer = Instant::now(); - - // Run the table tests. - let runner = ProcedureMigrationSuiteRunner::new(suite) - .steps(steps.clone()) - .run_once() - .await; - - // Ensure it didn't run into the slow path. - assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2); - - runner.suite.verify_table_metadata().await; - } - #[tokio::test] async fn test_procedure_flow_open_candidate_region_retryable_error() { common_telemetry::init_default_ut_logging(); @@ -1080,7 +1157,7 @@ mod tests { // The table metadata. let to_peer_id = persistent_context.to_peer.id; let from_peer = persistent_context.from_peer.clone(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), @@ -1168,13 +1245,12 @@ mod tests { let from_peer_id = persistent_context.from_peer.id; let to_peer_id = persistent_context.to_peer.id; let from_peer = persistent_context.from_peer.clone(); - let to_peer = persistent_context.to_peer.clone(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(from_peer), - follower_peers: vec![to_peer], + follower_peers: vec![], ..Default::default() }]; @@ -1184,9 +1260,34 @@ mod tests { let steps = vec![ // MigrationStart Step::next( - "Should be the update metadata for downgrading", + "Should be the open candidate region", None, - Assertion::simple(assert_update_metadata_downgrade, assert_need_persist), + Assertion::simple(assert_open_candidate_region, assert_need_persist), + ), + // OpenCandidateRegion + Step::next( + "Should be the flush leader region", + Some(mock_datanode_reply( + to_peer_id, + Arc::new(|id| Ok(new_open_region_reply(id, true, None))), + )), + Assertion::simple(assert_flush_leader_region, assert_no_persist), + ), + // Flush Leader Region + Step::next( + "Should be the flush leader region", + Some(mock_datanode_reply( + from_peer_id, + Arc::new(move |id| { + Ok(new_flush_region_reply_for_region( + id, + RegionId::new(1024, 1), + true, + None, + )) + }), + )), + Assertion::simple(assert_update_metadata_downgrade, assert_no_persist), ), // UpdateMetadata::Downgrade Step::next( @@ -1230,9 +1331,9 @@ mod tests { ]; let setup_to_latest_persisted_state = Step::setup( - "Sets state to UpdateMetadata::Downgrade", + "Sets state to OpenCandidateRegion", merge_before_test_fn(vec![ - setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))), + setup_state(Arc::new(|| Box::new(OpenCandidateRegion))), Arc::new(reset_volatile_ctx), ]), ); @@ -1264,7 +1365,7 @@ mod tests { let to_peer_id = persistent_context.to_peer.id; let from_peer_id = persistent_context.from_peer.id; let from_peer = persistent_context.from_peer.clone(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), diff --git a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs index 5a8beb7ca4..c20c7fede2 100644 --- a/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/close_downgraded_region.rs @@ -19,7 +19,6 @@ use api::v1::meta::MailboxMessage; use common_meta::RegionIdent; use common_meta::distributed_time_constants::REGION_LEASE_SECS; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_meta::key::datanode_table::RegionInfo; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; @@ -47,12 +46,12 @@ impl State for CloseDowngradedRegion { ) -> Result<(Box, Status)> { if let Err(err) = self.close_downgraded_leader_region(ctx).await { let downgrade_leader_datanode = &ctx.persistent_ctx.from_peer; - let region_id = ctx.region_id(); - warn!(err; "Failed to close downgraded leader region: {region_id} on datanode {:?}", downgrade_leader_datanode); + let region_ids = &ctx.persistent_ctx.region_ids; + warn!(err; "Failed to close downgraded leader regions: {region_ids:?} on datanode {:?}", downgrade_leader_datanode); } info!( - "Region migration is finished: region_id: {}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}", - ctx.region_id(), + "Region migration is finished: regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}", + ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer, ctx.persistent_ctx.to_peer, ctx.persistent_ctx.trigger_reason, @@ -74,28 +73,30 @@ impl CloseDowngradedRegion { async fn build_close_region_instruction(&self, ctx: &mut Context) -> Result { let pc = &ctx.persistent_ctx; let downgrade_leader_datanode_id = pc.from_peer.id; - let table_id = pc.region_id.table_id(); - let region_number = pc.region_id.region_number(); - let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; + let region_ids = &ctx.persistent_ctx.region_ids; + let mut idents = Vec::with_capacity(region_ids.len()); - let RegionInfo { engine, .. } = datanode_table_value.region_info.clone(); + for region_id in region_ids { + idents.push(RegionIdent { + datanode_id: downgrade_leader_datanode_id, + table_id: region_id.table_id(), + region_number: region_id.region_number(), + // The `engine` field is not used for closing region. + engine: String::new(), + }); + } - Ok(Instruction::CloseRegions(vec![RegionIdent { - datanode_id: downgrade_leader_datanode_id, - table_id, - region_number, - engine, - }])) + Ok(Instruction::CloseRegions(idents)) } /// Closes the downgraded leader region. async fn close_downgraded_leader_region(&self, ctx: &mut Context) -> Result<()> { let close_instruction = self.build_close_region_instruction(ctx).await?; - let region_id = ctx.region_id(); + let region_ids = &ctx.persistent_ctx.region_ids; let pc = &ctx.persistent_ctx; let downgrade_leader_datanode = &pc.from_peer; let msg = MailboxMessage::json_message( - &format!("Close downgraded region: {}", region_id), + &format!("Close downgraded regions: {:?}", region_ids), &format!("Metasrv@{}", ctx.server_addr()), &format!( "Datanode-{}@{}", @@ -118,8 +119,8 @@ impl CloseDowngradedRegion { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; info!( - "Received close downgraded leade region reply: {:?}, region: {}", - reply, region_id + "Received close downgraded leade region reply: {:?}, region: {:?}", + reply, region_ids ); let InstructionReply::CloseRegions(SimpleReply { result, error }) = reply else { return error::UnexpectedInstructionReplySnafu { @@ -134,7 +135,7 @@ impl CloseDowngradedRegion { } else { error::UnexpectedSnafu { violated: format!( - "Failed to close downgraded leader region: {region_id} on datanode {:?}, error: {error:?}", + "Failed to close downgraded leader region: {region_ids:?} on datanode {:?}, error: {error:?}", downgrade_leader_datanode, ), } diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index fb4065748c..5cc8f064fb 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -22,7 +22,7 @@ use common_meta::instruction::{ DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply, }; use common_procedure::{Context as ProcedureContext, Status}; -use common_telemetry::{error, info, warn}; +use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; @@ -70,30 +70,30 @@ impl State for DowngradeLeaderRegion { Ok(_) => { // Do nothing info!( - "Downgraded region leader success, region: {}", - ctx.persistent_ctx.region_id + "Downgraded region leader success, region: {:?}", + ctx.persistent_ctx.region_ids ); } Err(error::Error::ExceededDeadline { .. }) => { info!( - "Downgrade region leader exceeded deadline, region: {}", - ctx.persistent_ctx.region_id + "Downgrade region leader exceeded deadline, region: {:?}", + ctx.persistent_ctx.region_ids ); // Rollbacks the metadata if procedure is timeout return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false))); } Err(err) => { - error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id); + error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids); if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() { info!( - "Running into the downgrade region leader slow path, region: {}, sleep until {:?}", - ctx.persistent_ctx.region_id, deadline + "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}", + ctx.persistent_ctx.region_ids, deadline ); tokio::time::sleep_until(*deadline).await; } else { warn!( - "Leader region lease deadline is not set, region: {}", - ctx.persistent_ctx.region_id + "Leader region lease deadline is not set, region: {:?}", + ctx.persistent_ctx.region_ids ); } } @@ -118,12 +118,76 @@ impl DowngradeLeaderRegion { ctx: &Context, flush_timeout: Duration, ) -> Instruction { - let pc = &ctx.persistent_ctx; - let region_id = pc.region_id; - Instruction::DowngradeRegions(vec![DowngradeRegion { + let region_ids = &ctx.persistent_ctx.region_ids; + let mut downgrade_regions = Vec::with_capacity(region_ids.len()); + for region_id in region_ids { + downgrade_regions.push(DowngradeRegion { + region_id: *region_id, + flush_timeout: Some(flush_timeout), + }); + } + + Instruction::DowngradeRegions(downgrade_regions) + } + + fn handle_downgrade_region_reply( + &self, + ctx: &mut Context, + reply: &DowngradeRegionReply, + now: &Instant, + ) -> Result<()> { + let leader = &ctx.persistent_ctx.from_peer; + let DowngradeRegionReply { region_id, - flush_timeout: Some(flush_timeout), - }]) + last_entry_id, + metadata_last_entry_id, + exists, + error, + } = reply; + + if error.is_some() { + return error::RetryLaterSnafu { + reason: format!( + "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}", + region_id, leader, error, now.elapsed() + ), + } + .fail(); + } + + if !exists { + warn!( + "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}", + region_id, + leader, + now.elapsed() + ); + } else { + info!( + "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}", + region_id, + leader, + last_entry_id, + metadata_last_entry_id, + now.elapsed() + ); + } + + if let Some(last_entry_id) = last_entry_id { + debug!( + "set last_entry_id: {:?}, region_id: {:?}", + last_entry_id, region_id + ); + ctx.volatile_ctx + .set_last_entry_id(*region_id, *last_entry_id); + } + + if let Some(metadata_last_entry_id) = metadata_last_entry_id { + ctx.volatile_ctx + .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id); + } + + Ok(()) } /// Tries to downgrade a leader region. @@ -140,7 +204,7 @@ impl DowngradeLeaderRegion { /// - [ExceededDeadline](error::Error::ExceededDeadline) /// - Invalid JSON. async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> { - let region_id = ctx.persistent_ctx.region_id; + let region_ids = &ctx.persistent_ctx.region_ids; let operation_timeout = ctx.next_operation_timeout() .context(error::ExceededDeadlineSnafu { @@ -150,7 +214,7 @@ impl DowngradeLeaderRegion { let leader = &ctx.persistent_ctx.from_peer; let msg = MailboxMessage::json_message( - &format!("Downgrade leader region: {}", region_id), + &format!("Downgrade leader regions: {:?}", region_ids), &format!("Metasrv@{}", ctx.server_addr()), &format!("Datanode-{}@{}", leader.id, leader.addr), common_time::util::current_time_millis(), @@ -168,9 +232,9 @@ impl DowngradeLeaderRegion { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; info!( - "Received downgrade region reply: {:?}, region: {}, elapsed: {:?}", + "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}", reply, - region_id, + region_ids, now.elapsed() ); let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply @@ -182,57 +246,14 @@ impl DowngradeLeaderRegion { .fail(); }; - // TODO(weny): handle multiple replies. - let DowngradeRegionReply { - region_id, - last_entry_id, - metadata_last_entry_id, - exists, - error, - } = &replies[0]; - - if error.is_some() { - return error::RetryLaterSnafu { - reason: format!( - "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}", - region_id, leader, error, now.elapsed() - ), - } - .fail(); + for reply in replies { + self.handle_downgrade_region_reply(ctx, &reply, &now)?; } - - if !exists { - warn!( - "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}", - region_id, - leader, - now.elapsed() - ); - } else { - info!( - "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}", - region_id, - leader, - last_entry_id, - metadata_last_entry_id, - now.elapsed() - ); - } - - if let Some(last_entry_id) = last_entry_id { - ctx.volatile_ctx.set_last_entry_id(*last_entry_id); - } - - if let Some(metadata_last_entry_id) = metadata_last_entry_id { - ctx.volatile_ctx - .set_metadata_last_entry_id(*metadata_last_entry_id); - } - Ok(()) } Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( - "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}", + "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}", leader, now.elapsed() ); @@ -248,7 +269,7 @@ impl DowngradeLeaderRegion { let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await { Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis), Err(err) => { - error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id); + error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids); return; } }; @@ -266,8 +287,8 @@ impl DowngradeLeaderRegion { if elapsed >= (REGION_LEASE_SECS * 1000) as i64 { ctx.volatile_ctx.reset_leader_region_lease_deadline(); info!( - "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}", - leader, last_connection_at, region_lease, ctx.persistent_ctx.region_id + "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}", + leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids ); } else if elapsed > 0 { // `now - last_connection_at` < REGION_LEASE_SECS * 1000 @@ -277,23 +298,23 @@ impl DowngradeLeaderRegion { ctx.volatile_ctx .set_leader_region_lease_deadline(lease_timeout); info!( - "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}", + "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}", leader, last_connection_at, elapsed, ctx.volatile_ctx.leader_region_lease_deadline, - ctx.persistent_ctx.region_id + ctx.persistent_ctx.region_ids ); } else { warn!( - "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}", - leader, last_connection_at, now, ctx.persistent_ctx.region_id + "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}", + leader, last_connection_at, now, ctx.persistent_ctx.region_ids ) } } else { warn!( - "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}", - leader, ctx.persistent_ctx.region_id + "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}", + leader, ctx.persistent_ctx.region_ids ) } } @@ -318,19 +339,20 @@ impl DowngradeLeaderRegion { retry += 1; // Throws the error immediately if the procedure exceeded the deadline. if matches!(err, error::Error::ExceededDeadline { .. }) { - error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id); + error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids); return Err(err); } else if matches!(err, error::Error::PusherNotFound { .. }) { // Throws the error immediately if the datanode is unreachable. - error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id); + error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id); self.update_leader_region_lease_deadline(ctx).await; return Err(err); } else if err.is_retryable() && retry < self.optimistic_retry { - error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id); + error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids); sleep(self.retry_initial_interval).await; } else { return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu { - region_id: ctx.persistent_ctx.region_id, + // TODO(weny): handle multiple regions. + region_id: ctx.persistent_ctx.region_ids[0], })?; } } else { @@ -372,17 +394,17 @@ mod tests { schema: "public".into(), from_peer: Peer::empty(1), to_peer: Peer::empty(2), - region_id: RegionId::new(1024, 1), + region_ids: vec![RegionId::new(1024, 1)], timeout: Duration::from_millis(1000), trigger_reason: RegionMigrationTriggerReason::Manual, } } async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { - let table_info = - new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into(); + let region_id = ctx.persistent_ctx.region_ids[0]; + let table_info = new_test_table_info(region_id.table_id(), vec![1]).into(); let region_routes = vec![RegionRoute { - region: Region::new_test(ctx.persistent_ctx.region_id), + region: Region::new_test(region_id), leader_peer: Some(ctx.persistent_ctx.from_peer.clone()), follower_peers: vec![ctx.persistent_ctx.to_peer.clone()], ..Default::default() @@ -590,7 +612,13 @@ mod tests { }); state.downgrade_region_with_retry(&mut ctx).await.unwrap(); - assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1)); + assert_eq!( + ctx.volatile_ctx + .leader_region_last_entry_ids + .get(&RegionId::new(0, 0)) + .cloned(), + Some(1) + ); assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none()); } @@ -636,7 +664,7 @@ mod tests { .await .unwrap_err(); assert_matches!(err, error::Error::DowngradeLeader { .. }); - assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None); + // assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None); // Should remain no change. assert_eq!( ctx.volatile_ctx.leader_region_lease_deadline.unwrap(), @@ -671,7 +699,13 @@ mod tests { let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let elapsed = timer.elapsed().as_secs(); assert!(elapsed < REGION_LEASE_SECS / 2); - assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1)); + assert_eq!( + ctx.volatile_ctx + .leader_region_last_entry_ids + .get(&RegionId::new(0, 0)) + .cloned(), + Some(1) + ); assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none()); let _ = next diff --git a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs index b5cc1a955c..f9e5900cbb 100644 --- a/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/flush_leader_region.rs @@ -15,7 +15,7 @@ use std::any::Any; use api::v1::meta::MailboxMessage; -use common_meta::instruction::{FlushRegions, Instruction, InstructionReply}; +use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply}; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::{info, warn}; use serde::{Deserialize, Serialize}; @@ -64,8 +64,10 @@ impl PreFlushRegion { /// Builds flush leader region instruction. fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction { let pc = &ctx.persistent_ctx; - let region_id = pc.region_id; - Instruction::FlushRegions(FlushRegions::sync_single(region_id)) + Instruction::FlushRegions(FlushRegions::sync_batch( + pc.region_ids.clone(), + FlushErrorStrategy::TryAll, + )) } /// Tries to flush a leader region. @@ -88,11 +90,11 @@ impl PreFlushRegion { operation: "Flush leader region", })?; let flush_instruction = self.build_flush_leader_region_instruction(ctx); - let region_id = ctx.persistent_ctx.region_id; + let region_ids = &ctx.persistent_ctx.region_ids; let leader = &ctx.persistent_ctx.from_peer; let msg = MailboxMessage::json_message( - &format!("Flush leader region: {}", region_id), + &format!("Flush leader region: {:?}", region_ids), &format!("Metasrv@{}", ctx.server_addr()), &format!("Datanode-{}@{}", leader.id, leader.addr), common_time::util::current_time_millis(), @@ -111,32 +113,42 @@ impl PreFlushRegion { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; info!( - "Received flush leader region reply: {:?}, region: {}, elapsed: {:?}", + "Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}", reply, - region_id, + region_ids, now.elapsed() ); let reply_result = match reply { InstructionReply::FlushRegions(flush_reply) => { - if flush_reply.results.len() != 1 { + if flush_reply.results.len() != region_ids.len() { return error::UnexpectedInstructionReplySnafu { mailbox_message: msg.to_string(), - reason: "expect single region flush result", + reason: format!( + "expect {} region flush result, but got {}", + region_ids.len(), + flush_reply.results.len() + ), } .fail(); } - let (reply_region_id, result) = &flush_reply.results[0]; - if *reply_region_id != region_id { - return error::UnexpectedInstructionReplySnafu { - mailbox_message: msg.to_string(), - reason: "flush reply region ID mismatch", - } - .fail(); - } - match result { - Ok(()) => (true, None), - Err(err) => (false, Some(err.clone())), + + match flush_reply.overall_success { + true => (true, None), + false => ( + false, + Some( + flush_reply + .results + .iter() + .filter_map(|(region_id, result)| match result { + Ok(_) => None, + Err(e) => Some(format!("{}: {}", region_id, e)), + }) + .collect::>() + .join("; "), + ), + ), } } _ => { @@ -149,15 +161,15 @@ impl PreFlushRegion { }; let (result, error) = reply_result; - if error.is_some() { + if let Some(error) = error { warn!( - "Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.", - region_id, leader, error + "Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.", + region_ids, leader, &error ); } else if result { info!( - "The flush leader region {} on datanode {:?} is successful, elapsed: {:?}", - region_id, + "The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}", + region_ids, leader, now.elapsed() ); @@ -166,15 +178,15 @@ impl PreFlushRegion { Ok(()) } Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu { - operation: "Flush leader region", + operation: "Flush leader regions", } .fail(), Err(err) => Err(err), }, Err(Error::PusherNotFound { .. }) => { warn!( - "Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.", - region_id, leader + "Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.", + region_ids, leader ); Ok(()) } @@ -268,7 +280,7 @@ mod tests { // to_peer: 2 let persistent_context = new_persistent_context(); let from_peer_id = persistent_context.from_peer.id; - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); @@ -297,7 +309,7 @@ mod tests { // to_peer: 2 let persistent_context = new_persistent_context(); let from_peer_id = persistent_context.from_peer.id; - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let mut env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); let mailbox_ctx = env.mailbox_context(); diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 563b0f290d..782d2fe7ec 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -387,14 +387,14 @@ impl RegionMigrationManager { PersistentContext { catalog: catalog_name, schema: schema_name, - region_id, + region_ids: vec![region_id], from_peer, to_peer, timeout, trigger_reason, }, self.context_factory.clone(), - Some(guard), + vec![guard], ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); let procedure_id = procedure_with_id.id; diff --git a/src/meta-srv/src/procedure/region_migration/migration_abort.rs b/src/meta-srv/src/procedure/region_migration/migration_abort.rs index a25443c815..f3ad8052de 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_abort.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_abort.rs @@ -44,9 +44,9 @@ impl State for RegionMigrationAbort { _procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { warn!( - "Region migration is aborted: {}, region_id: {}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}", + "Region migration is aborted: {}, regions: {:?}, from_peer: {}, to_peer: {}, trigger_reason: {}, {}", self.reason, - ctx.region_id(), + ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer, ctx.persistent_ctx.to_peer, ctx.persistent_ctx.trigger_reason, diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index e544adbf4c..99d2972aa8 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -20,22 +20,18 @@ use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; use crate::error::{self, Result}; use crate::procedure::region_migration::migration_abort::RegionMigrationAbort; use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; -use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{Context, State}; /// The behaviors: /// -/// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state. -/// -/// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state. -/// -/// Otherwise go to the [OpenCandidateRegion] state. +/// - If all regions have been migrated, transitions to [RegionMigrationEnd]. +/// - If any of the region leaders is not the `from_peer`, transitions to [RegionMigrationAbort]. +/// - Otherwise, continues with [OpenCandidateRegion] to initiate the candidate region. #[derive(Debug, Serialize, Deserialize)] pub struct RegionMigrationStart; @@ -44,44 +40,62 @@ pub struct RegionMigrationStart; impl State for RegionMigrationStart { /// Yields next [State]. /// - /// If the expected leader region has been opened on `to_peer`, go to the [RegionMigrationEnd] state. + /// Determines the next [State] for region migration: /// - /// If the candidate region has been opened on `to_peer`, go to the [UpdateMetadata::Downgrade] state. - /// - /// Otherwise go to the [OpenCandidateRegion] state. + /// - If all regions have been migrated, transitions to [RegionMigrationEnd]. + /// - If any of the region leaders is not the `from_peer`, transitions to [RegionMigrationAbort]. + /// - Otherwise, continues with [OpenCandidateRegion] to initiate the candidate region. async fn next( &mut self, ctx: &mut Context, _procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { - let region_id = ctx.persistent_ctx.region_id; - let region_route = self.retrieve_region_route(ctx, region_id).await?; + let mut region_routes = self.retrieve_region_routes(ctx).await?; let to_peer = &ctx.persistent_ctx.to_peer; let from_peer = &ctx.persistent_ctx.from_peer; + let region_ids = &ctx.persistent_ctx.region_ids; - if self.has_migrated(®ion_route, to_peer)? { + self.filter_unmigrated_regions(&mut region_routes, to_peer); + + // No region to migrate, skip the migration. + if region_routes.is_empty() { info!( - "Region has been migrated, region: {:?}, to_peer: {:?}", - region_route.region.id, to_peer + "All regions have been migrated, regions: {:?}, to_peer: {:?}", + region_ids, to_peer ); - Ok((Box::new(RegionMigrationEnd), Status::done())) - } else if self.invalid_leader_peer(®ion_route, from_peer)? { - info!( - "Abort region migration, region:{:?}, unexpected leader peer: {:?}, expected: {:?}", - region_route.region.id, region_route.leader_peer, from_peer, - ); - Ok(( - Box::new(RegionMigrationAbort::new(&format!( - "Invalid region leader peer: {from_peer:?}, expected: {:?}", - region_route.leader_peer.as_ref().unwrap(), - ))), - Status::done(), - )) - } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { - Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true))) - } else { - Ok((Box::new(OpenCandidateRegion), Status::executing(true))) + return Ok((Box::new(RegionMigrationEnd), Status::done())); } + + // Updates the region ids to the unmigrated regions. + if region_routes.len() != region_ids.len() { + let unmigrated_region_ids = region_routes.iter().map(|route| route.region.id).collect(); + info!( + "Some of the regions have been migrated, only migrate the following regions: {:?}, to_peer: {:?}", + unmigrated_region_ids, to_peer + ); + ctx.persistent_ctx.region_ids = unmigrated_region_ids; + } + + // Checks if any of the region leaders is not the `from_peer`. + for region_route in ®ion_routes { + if self.invalid_leader_peer(region_route, from_peer)? { + info!( + "Abort region migration, region:{}, unexpected leader peer: {:?}, expected: {:?}", + region_route.region.id, region_route.leader_peer, from_peer, + ); + return Ok(( + Box::new(RegionMigrationAbort::new(&format!( + "Invalid region leader peer: {:?}, expected: {:?}", + region_route.leader_peer.as_ref().unwrap(), + from_peer, + ))), + Status::done(), + )); + } + } + + // If all checks pass, open the candidate region. + Ok((Box::new(OpenCandidateRegion), Status::executing(true))) } fn as_any(&self) -> &dyn Any { @@ -90,7 +104,7 @@ impl State for RegionMigrationStart { } impl RegionMigrationStart { - /// Retrieves region route. + /// Retrieves region routes for multiple regions. /// /// Abort(non-retry): /// - TableRoute is not found. @@ -98,39 +112,32 @@ impl RegionMigrationStart { /// /// Retry: /// - Failed to retrieve the metadata of table. - async fn retrieve_region_route( - &self, - ctx: &mut Context, - region_id: RegionId, - ) -> Result { - let table_id = region_id.table_id(); - let table_route = ctx.get_table_route_value().await?; + async fn retrieve_region_routes(&self, ctx: &mut Context) -> Result> { + let region_ids = &ctx.persistent_ctx.region_ids; + let table_route_values = ctx.get_table_route_values().await?; + let mut region_routes = Vec::with_capacity(region_ids.len()); + for region_id in region_ids { + let table_id = region_id.table_id(); + let region_route = table_route_values + .get(&table_id) + .context(error::TableRouteNotFoundSnafu { table_id })? + .region_routes() + .with_context(|_| error::UnexpectedLogicalRouteTableSnafu { + err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."), + })? + .iter() + .find(|route| route.region.id == *region_id) + .cloned() + .with_context(|| error::UnexpectedSnafu { + violated: format!( + "RegionRoute({}) is not found in TableRoute({})", + region_id, table_id + ), + })?; + region_routes.push(region_route); + } - let region_route = table_route - .region_routes() - .context(error::UnexpectedLogicalRouteTableSnafu { - err_msg: format!("{self:?} is a non-physical TableRouteValue."), - })? - .iter() - .find(|route| route.region.id == region_id) - .cloned() - .context(error::UnexpectedSnafu { - violated: format!( - "RegionRoute({}) is not found in TableRoute({})", - region_id, table_id - ), - })?; - - Ok(region_route) - } - - /// Checks whether the candidate region on region has been opened. - /// Returns true if it's been opened. - fn check_candidate_region_on_peer(&self, region_route: &RegionRoute, to_peer: &Peer) -> bool { - region_route - .follower_peers - .iter() - .any(|peer| peer.id == to_peer.id) + Ok(region_routes) } /// Returns true if the region leader is not the `from_peer`. @@ -143,7 +150,7 @@ impl RegionMigrationStart { let is_invalid_leader_peer = region_route .leader_peer .as_ref() - .context(error::UnexpectedSnafu { + .with_context(|| error::UnexpectedSnafu { violated: format!("Leader peer is not found in TableRoute({})", region_id), })? .id @@ -151,6 +158,12 @@ impl RegionMigrationStart { Ok(is_invalid_leader_peer) } + /// Filters out regions that unmigrated. + fn filter_unmigrated_regions(&self, region_routes: &mut Vec, to_peer: &Peer) { + region_routes + .retain(|region_route| !self.has_migrated(region_route, to_peer).unwrap_or(false)); + } + /// Checks whether the region has been migrated. /// Returns true if it's. /// @@ -162,7 +175,7 @@ impl RegionMigrationStart { let region_migrated = region_route .leader_peer .as_ref() - .context(error::UnexpectedSnafu { + .with_context(|| error::UnexpectedSnafu { violated: format!("Leader peer is not found in TableRoute({})", region_id), })? .id @@ -173,6 +186,7 @@ impl RegionMigrationStart { #[cfg(test)] mod tests { + use std::assert_matches::assert_matches; use common_meta::key::test_utils::new_test_table_info; @@ -183,7 +197,6 @@ mod tests { use super::*; use crate::error::Error; use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context}; - use crate::procedure::region_migration::update_metadata::UpdateMetadata; use crate::procedure::region_migration::{ContextFactory, PersistentContext}; fn new_persistent_context() -> PersistentContext { @@ -196,14 +209,8 @@ mod tests { let env = TestingEnv::new(); let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - - let err = state - .retrieve_region_route(&mut ctx, RegionId::new(1024, 1)) - .await - .unwrap_err(); - + let err = state.retrieve_region_routes(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::TableRouteNotFound { .. }); - assert!(!err.is_retryable()); } @@ -216,56 +223,20 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_info = new_test_table_info(1024, vec![1]).into(); + let table_info = new_test_table_info(1024, vec![3]).into(); let region_route = RegionRoute { - region: Region::new_test(RegionId::new(1024, 1)), + region: Region::new_test(RegionId::new(1024, 3)), leader_peer: Some(from_peer.clone()), ..Default::default() }; env.create_physical_table_metadata(table_info, vec![region_route]) .await; - - let err = state - .retrieve_region_route(&mut ctx, RegionId::new(1024, 3)) - .await - .unwrap_err(); - + let err = state.retrieve_region_routes(&mut ctx).await.unwrap_err(); assert_matches!(err, Error::Unexpected { .. }); assert!(!err.is_retryable()); } - #[tokio::test] - async fn test_next_update_metadata_downgrade_state() { - let mut state = Box::new(RegionMigrationStart); - // from_peer: 1 - // to_peer: 2 - let persistent_context = new_persistent_context(); - let from_peer_id = persistent_context.from_peer.id; - let to_peer = persistent_context.to_peer.clone(); - let region_id = persistent_context.region_id; - - let env = TestingEnv::new(); - let mut ctx = env.context_factory().new_context(persistent_context); - - let table_info = new_test_table_info(1024, vec![1]).into(); - let region_routes = vec![RegionRoute { - region: Region::new_test(region_id), - leader_peer: Some(Peer::empty(from_peer_id)), - follower_peers: vec![to_peer], - ..Default::default() - }]; - - env.create_physical_table_metadata(table_info, region_routes) - .await; - let procedure_ctx = new_procedure_context(); - let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); - - let update_metadata = next.as_any().downcast_ref::().unwrap(); - - assert_matches!(update_metadata, UpdateMetadata::Downgrade); - } - #[tokio::test] async fn test_next_migration_end_state() { let mut state = Box::new(RegionMigrationStart); @@ -274,7 +245,7 @@ mod tests { let persistent_context = new_persistent_context(); let to_peer = persistent_context.to_peer.clone(); let from_peer = persistent_context.from_peer.clone(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); @@ -302,7 +273,7 @@ mod tests { // to_peer: 2 let persistent_context = new_persistent_context(); let from_peer_id = persistent_context.from_peer.id; - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); @@ -327,12 +298,12 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); let table_info = new_test_table_info(1024, vec![1]).into(); - let region_routes = vec![RegionRoute { + let region_routes: Vec = vec![RegionRoute { region: Region::new_test(region_id), leader_peer: Some(Peer::empty(1024)), ..Default::default() diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 111bd41fd2..67e1bfb857 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -66,33 +66,43 @@ impl OpenCandidateRegion { /// Abort(non-retry): /// - Datanode Table is not found. async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result { - let pc = &ctx.persistent_ctx; - let table_id = pc.region_id.table_id(); - let region_number = pc.region_id.region_number(); - let candidate_id = pc.to_peer.id; - let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; + let region_ids = ctx.persistent_ctx.region_ids.clone(); + let from_peer_id = ctx.persistent_ctx.from_peer.id; + let to_peer_id = ctx.persistent_ctx.to_peer.id; + let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?; + let mut open_regions = Vec::with_capacity(region_ids.len()); - let RegionInfo { - region_storage_path, - region_options, - region_wal_options, - engine, - } = datanode_table_value.region_info.clone(); - - let open_instruction = Instruction::OpenRegions(vec![OpenRegion::new( - RegionIdent { - datanode_id: candidate_id, - table_id, - region_number, + for region_id in region_ids { + let table_id = region_id.table_id(); + let region_number = region_id.region_number(); + let datanode_table_value = datanode_table_values.get(&table_id).context( + error::DatanodeTableNotFoundSnafu { + table_id, + datanode_id: from_peer_id, + }, + )?; + let RegionInfo { + region_storage_path, + region_options, + region_wal_options, engine, - }, - ®ion_storage_path, - region_options, - region_wal_options, - true, - )]); + } = datanode_table_value.region_info.clone(); - Ok(open_instruction) + open_regions.push(OpenRegion::new( + RegionIdent { + datanode_id: to_peer_id, + table_id, + region_number, + engine, + }, + ®ion_storage_path, + region_options, + region_wal_options, + true, + )); + } + + Ok(Instruction::OpenRegions(open_regions)) } /// Opens the candidate region. @@ -112,25 +122,27 @@ impl OpenCandidateRegion { ) -> Result<()> { let pc = &ctx.persistent_ctx; let vc = &mut ctx.volatile_ctx; - let region_id = pc.region_id; + let region_ids = &pc.region_ids; let candidate = &pc.to_peer; // This method might be invoked multiple times. // Only registers the guard if `opening_region_guard` is absent. - if vc.opening_region_guard.is_none() { - // Registers the opening region. - let guard = ctx - .opening_region_keeper - .register(candidate.id, region_id) - .context(error::RegionOpeningRaceSnafu { - peer_id: candidate.id, - region_id, - })?; - vc.opening_region_guard = Some(guard); + if vc.opening_region_guards.is_empty() { + for region_id in region_ids { + // Registers the opening region. + let guard = ctx + .opening_region_keeper + .register(candidate.id, *region_id) + .context(error::RegionOpeningRaceSnafu { + peer_id: candidate.id, + region_id: *region_id, + })?; + vc.opening_region_guards.push(guard); + } } let msg = MailboxMessage::json_message( - &format!("Open candidate region: {}", region_id), + &format!("Open candidate regions: {:?}", region_ids), &format!("Metasrv@{}", ctx.server_addr()), &format!("Datanode-{}@{}", candidate.id, candidate.addr), common_time::util::current_time_millis(), @@ -154,9 +166,9 @@ impl OpenCandidateRegion { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; info!( - "Received open region reply: {:?}, region: {}, elapsed: {:?}", + "Received open region reply: {:?}, region: {:?}, elapsed: {:?}", reply, - region_id, + region_ids, now.elapsed() ); let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else { @@ -172,7 +184,7 @@ impl OpenCandidateRegion { } else { error::RetryLaterSnafu { reason: format!( - "Region {region_id} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}", + "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}", candidate, now.elapsed() ), @@ -182,7 +194,7 @@ impl OpenCandidateRegion { } Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( - "Mailbox received timeout for open candidate region {region_id} on datanode {:?}, elapsed: {:?}", + "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}", candidate, now.elapsed() ); @@ -255,7 +267,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let to_peer_id = persistent_context.to_peer.id; let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); @@ -276,7 +288,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let to_peer_id = persistent_context.to_peer.id; let env = TestingEnv::new(); @@ -302,7 +314,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); @@ -335,7 +347,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); @@ -370,7 +382,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); @@ -410,14 +422,14 @@ mod tests { // to_peer: 2 let persistent_context = new_persistent_context(); let from_peer_id = persistent_context.from_peer.id; - let region_id = persistent_context.region_id; + let region_id = persistent_context.region_ids[0]; let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); // Prepares table let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { - region: Region::new_test(persistent_context.region_id), + region: Region::new_test(region_id), leader_peer: Some(Peer::empty(from_peer_id)), ..Default::default() }]; @@ -445,10 +457,7 @@ mod tests { let procedure_ctx = new_procedure_context(); let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap(); let vc = ctx.volatile_ctx; - assert_eq!( - vc.opening_region_guard.unwrap().info(), - (to_peer_id, region_id) - ); + assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id)); let flush_leader_region = next.as_any().downcast_ref::().unwrap(); assert_matches!(flush_leader_region, PreFlushRegion); diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index a44c3a20c6..375d17af89 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -190,7 +190,7 @@ pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> Persis schema: "public".into(), from_peer: Peer::empty(from), to_peer: Peer::empty(to), - region_id, + region_ids: vec![region_id], timeout: Duration::from_secs(10), trigger_reason: RegionMigrationTriggerReason::default(), } @@ -306,37 +306,38 @@ impl ProcedureMigrationTestSuite { /// Verifies table metadata after region migration. pub(crate) async fn verify_table_metadata(&self) { - let region_id = self.context.persistent_ctx.region_id; - let table_route = self - .env - .table_metadata_manager - .table_route_manager() - .table_route_storage() - .get(region_id.table_id()) - .await - .unwrap() - .unwrap(); - let region_routes = table_route.region_routes().unwrap(); + for region_id in &self.context.persistent_ctx.region_ids { + let table_route = self + .env + .table_metadata_manager + .table_route_manager() + .table_route_storage() + .get(region_id.table_id()) + .await + .unwrap() + .unwrap(); + let region_routes = table_route.region_routes().unwrap(); - let expected_leader_id = self.context.persistent_ctx.to_peer.id; - let removed_follower_id = self.context.persistent_ctx.from_peer.id; + let expected_leader_id = self.context.persistent_ctx.to_peer.id; + let removed_follower_id = self.context.persistent_ctx.from_peer.id; - let region_route = region_routes - .iter() - .find(|route| route.region.id == region_id) - .unwrap(); - - assert!(!region_route.is_leader_downgrading()); - assert_eq!( - region_route.leader_peer.as_ref().unwrap().id, - expected_leader_id - ); - assert!( - !region_route - .follower_peers + let region_route = region_routes .iter() - .any(|route| route.id == removed_follower_id) - ) + .find(|route| route.region.id == *region_id) + .unwrap(); + + assert!(!region_route.is_leader_downgrading()); + assert_eq!( + region_route.leader_peer.as_ref().unwrap().id, + expected_leader_id + ); + assert!( + !region_route + .follower_peers + .iter() + .any(|route| route.id == removed_follower_id) + ) + } } } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata.rs b/src/meta-srv/src/procedure/region_migration/update_metadata.rs index 8e7b2d4d3b..e96a025c5d 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata.rs @@ -18,7 +18,6 @@ pub(crate) mod upgrade_candidate_region; use std::any::Any; -use common_meta::lock_key::TableLock; use common_procedure::{Context as ProcedureContext, Status}; use common_telemetry::warn; use serde::{Deserialize, Serialize}; @@ -48,12 +47,10 @@ impl State for UpdateMetadata { ctx: &mut Context, procedure_ctx: &ProcedureContext, ) -> Result<(Box, Status)> { - let table_id = TableLock::Write(ctx.region_id().table_id()).into(); - let _guard = procedure_ctx.provider.acquire_lock(&table_id).await; - match self { UpdateMetadata::Downgrade => { - self.downgrade_leader_region(ctx).await?; + self.downgrade_leader_region(ctx, &procedure_ctx.provider) + .await?; Ok(( Box::::default(), @@ -61,7 +58,8 @@ impl State for UpdateMetadata { )) } UpdateMetadata::Upgrade => { - self.upgrade_candidate_region(ctx).await?; + self.upgrade_candidate_region(ctx, &procedure_ctx.provider) + .await?; if let Err(err) = ctx.invalidate_table_cache().await { warn!( @@ -71,7 +69,8 @@ impl State for UpdateMetadata { Ok((Box::new(CloseDowngradedRegion), Status::executing(false))) } UpdateMetadata::Rollback => { - self.rollback_downgraded_region(ctx).await?; + self.rollback_downgraded_region(ctx, &procedure_ctx.provider) + .await?; if let Err(err) = ctx.invalidate_table_cache().await { warn!( diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 77e5acbacd..05e29c9b08 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -13,7 +13,10 @@ // limitations under the License. use common_error::ext::BoxedError; +use common_meta::lock_key::TableLock; use common_meta::rpc::router::LeaderState; +use common_procedure::ContextProviderRef; +use common_telemetry::{error, info}; use snafu::ResultExt; use crate::error::{self, Result}; @@ -37,35 +40,46 @@ impl UpdateMetadata { /// It will only update **other region** info. Therefore, It's safe to retry after failure. /// /// - There is no other DDL procedure executed concurrently for the current table. - pub async fn downgrade_leader_region(&self, ctx: &mut Context) -> Result<()> { + pub async fn downgrade_leader_region( + &self, + ctx: &mut Context, + ctx_provider: &ContextProviderRef, + ) -> Result<()> { let table_metadata_manager = ctx.table_metadata_manager.clone(); let from_peer_id = ctx.persistent_ctx.from_peer.id; - let region_id = ctx.region_id(); - let table_id = region_id.table_id(); - let current_table_route_value = ctx.get_table_route_value().await?; + let table_regions = ctx.persistent_ctx.table_regions(); - // TODO(weny): ensures the leader region peer is the `from_peer`. - if let Err(err) = table_metadata_manager - .update_leader_region_status(table_id, ¤t_table_route_value, |route| { - if route.region.id == region_id - && route - .leader_peer - .as_ref() - .is_some_and(|leader_peer| leader_peer.id == from_peer_id) - { - Some(Some(LeaderState::Downgrading)) - } else { - None - } - }) - .await - .context(error::TableMetadataManagerSnafu) - { - return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { - reason: format!( - "Failed to update the table route during the downgrading leader region, region_id: {region_id}, from_peer_id: {from_peer_id}" - ), - }); + for (table_id, regions) in table_regions { + let table_lock = TableLock::Write(table_id).into(); + let _guard = ctx_provider.acquire_lock(&table_lock).await; + + let current_table_route_value = ctx.get_table_route_value(table_id).await?; + if let Err(err) = table_metadata_manager + .update_leader_region_status(table_id, ¤t_table_route_value, |route| { + if regions.contains(&route.region.id) + && route + .leader_peer + .as_ref() + .is_some_and(|leader_peer| leader_peer.id == from_peer_id) + { + Some(Some(LeaderState::Downgrading)) + } else { + None + } + }) + .await + .context(error::TableMetadataManagerSnafu) + { + error!(err; "Failed to update the table route during the downgrading leader region, regions: {regions:?}, from_peer_id: {from_peer_id}"); + return Err(BoxedError::new(err)).with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!( + "Failed to update the table route during the downgrading leader region, regions: {regions:?}, from_peer_id: {from_peer_id}" + ), + }); + } + info!( + "Downgrading leader region table route success, table_id: {table_id}, regions: {regions:?}, from_peer_id: {from_peer_id}" + ); } Ok(()) @@ -75,10 +89,13 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; + use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; + use common_procedure_test::MockContextProvider; use store_api::storage::RegionId; use crate::error::Error; @@ -104,8 +121,12 @@ mod tests { let env = TestingEnv::new(); let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); + let provider = Arc::new(MockContextProvider::new(HashMap::new())) as _; - let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); + let err = state + .downgrade_leader_region(&mut ctx, &provider) + .await + .unwrap_err(); assert_matches!(err, Error::TableRouteNotFound { .. }); @@ -119,7 +140,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_id = ctx.region_id().table_id(); + let table_id = ctx.persistent_ctx.region_ids[0].table_id(); let table_info = new_test_table_info(1024, vec![1, 2]).into(); let region_routes = vec![RegionRoute { @@ -162,7 +183,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_id = ctx.region_id().table_id(); + let table_id = ctx.persistent_ctx.region_ids[0].table_id(); let table_info = new_test_table_info(1024, vec![1, 2]).into(); let region_routes = vec![RegionRoute { diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 8f50e14b33..fc32e37672 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -13,6 +13,9 @@ // limitations under the License. use common_error::ext::BoxedError; +use common_meta::lock_key::TableLock; +use common_procedure::ContextProviderRef; +use common_telemetry::{error, info}; use snafu::ResultExt; use crate::error::{self, Result}; @@ -28,28 +31,39 @@ impl UpdateMetadata { /// Retry: /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue). /// - Failed to retrieve the metadata of table. - pub async fn rollback_downgraded_region(&self, ctx: &mut Context) -> Result<()> { + pub async fn rollback_downgraded_region( + &self, + ctx: &mut Context, + ctx_provider: &ContextProviderRef, + ) -> Result<()> { let table_metadata_manager = ctx.table_metadata_manager.clone(); - let region_id = ctx.region_id(); - let table_id = region_id.table_id(); - let current_table_route_value = ctx.get_table_route_value().await?; + let table_regions = ctx.persistent_ctx.table_regions(); - if let Err(err) = table_metadata_manager - .update_leader_region_status(table_id, ¤t_table_route_value, |route| { - if route.region.id == region_id { - Some(None) - } else { - None - } - }) - .await - .context(error::TableMetadataManagerSnafu) - { - return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { - reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"), - }); + for (table_id, regions) in table_regions { + let table_lock = TableLock::Write(table_id).into(); + let _guard = ctx_provider.acquire_lock(&table_lock).await; + + let current_table_route_value = ctx.get_table_route_value(table_id).await?; + if let Err(err) = table_metadata_manager + .update_leader_region_status(table_id, ¤t_table_route_value, |route| { + if regions.contains(&route.region.id) { + Some(None) + } else { + None + } + }) + .await + .context(error::TableMetadataManagerSnafu) + { + error!(err; "Failed to update the table route during the rollback downgraded leader regions: {regions:?}"); + return Err(BoxedError::new(err)).with_context(|_| error::RetryLaterWithSourceSnafu { + reason: format!("Failed to update the table route during the rollback downgraded leader regions: {regions:?}"), + }); + } + info!( + "Rolling back downgraded leader region table route success, table_id: {table_id}, regions: {regions:?}" + ); } - ctx.register_failure_detectors().await; Ok(()) @@ -59,10 +73,13 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; + use std::sync::Arc; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; + use common_procedure_test::MockContextProvider; use store_api::storage::RegionId; use crate::error::Error; @@ -82,7 +99,11 @@ mod tests { let persistent_context = new_persistent_context(); let mut ctx = env.context_factory().new_context(persistent_context); - let err = state.downgrade_leader_region(&mut ctx).await.unwrap_err(); + let provider = Arc::new(MockContextProvider::new(HashMap::new())) as _; + let err = state + .rollback_downgraded_region(&mut ctx, &provider) + .await + .unwrap_err(); assert_matches!(err, Error::TableRouteNotFound { .. }); @@ -97,7 +118,7 @@ mod tests { let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); - let table_id = ctx.region_id().table_id(); + let table_id = ctx.persistent_ctx.region_ids[0].table_id(); let table_info = new_test_table_info(1024, vec![1, 2, 3]).into(); let region_routes = vec![ diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 7e33c9c75c..0e545f5d92 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -14,9 +14,12 @@ use common_error::ext::BoxedError; use common_meta::key::datanode_table::RegionInfo; +use common_meta::lock_key::TableLock; use common_meta::rpc::router::{RegionRoute, region_distribution}; -use common_telemetry::{info, warn}; +use common_procedure::ContextProviderRef; +use common_telemetry::{error, info, warn}; use snafu::{OptionExt, ResultExt, ensure}; +use store_api::storage::RegionId; use crate::error::{self, Result}; use crate::procedure::region_migration::Context; @@ -24,104 +27,114 @@ use crate::procedure::region_migration::update_metadata::UpdateMetadata; impl UpdateMetadata { /// Returns new [Vec]. - async fn build_upgrade_candidate_region_metadata( + fn build_upgrade_candidate_region_metadata( &self, ctx: &mut Context, + region_ids: &[RegionId], + mut region_routes: Vec, ) -> Result> { - let region_id = ctx.region_id(); - let table_route_value = ctx.get_table_route_value().await?.clone(); + let old_leader_peer = &ctx.persistent_ctx.from_peer; + let new_leader_peer = &ctx.persistent_ctx.to_peer; + for region_id in region_ids { + // Find the RegionRoute for this region_id. + let region_route = region_routes + .iter_mut() + .find(|route| route.region.id == *region_id) + .context(error::RegionRouteNotFoundSnafu { + region_id: *region_id, + })?; - let mut region_routes = table_route_value - .region_routes() - .context(error::UnexpectedLogicalRouteTableSnafu { - err_msg: format!("{self:?} is a non-physical TableRouteValue."), - })? - .clone(); - let region_route = region_routes - .iter_mut() - .find(|route| route.region.id == region_id) - .context(error::RegionRouteNotFoundSnafu { region_id })?; + // Remove any "downgraded leader" state. + region_route.set_leader_state(None); - // Removes downgraded status. - region_route.set_leader_state(None); - - let candidate = &ctx.persistent_ctx.to_peer; - let expected_old_leader = &ctx.persistent_ctx.from_peer; - - // Upgrades candidate to leader. - ensure!( - region_route - .leader_peer - .take_if(|old_leader| old_leader.id == expected_old_leader.id) - .is_some(), - error::UnexpectedSnafu { - violated: format!( - "Unexpected region leader: {:?} during the upgrading candidate metadata, expected: {:?}", - region_route.leader_peer, expected_old_leader - ), - } - ); - - region_route.leader_peer = Some(candidate.clone()); - info!( - "Upgrading candidate region to leader region: {:?} for region: {}", - candidate, region_id - ); - - // Removes the candidate region in followers. - let removed = region_route - .follower_peers - .extract_if(.., |peer| peer.id == candidate.id) - .collect::>(); - - if removed.len() > 1 { - warn!( - "Removes duplicated regions: {removed:?} during the upgrading candidate metadata for region: {region_id}" - ); - } - - Ok(region_routes) - } - - /// Returns true if region metadata has been updated. - async fn check_metadata_updated(&self, ctx: &mut Context) -> Result { - let region_id = ctx.region_id(); - let table_route_value = ctx.get_table_route_value().await?.clone(); - - let region_routes = table_route_value - .region_routes() - .context(error::UnexpectedLogicalRouteTableSnafu { - err_msg: format!("{self:?} is a non-physical TableRouteValue."), - })? - .clone(); - let region_route = region_routes - .into_iter() - .find(|route| route.region.id == region_id) - .context(error::RegionRouteNotFoundSnafu { region_id })?; - - let leader_peer = region_route - .leader_peer - .as_ref() - .context(error::UnexpectedSnafu { - violated: format!("The leader peer of region {region_id} is not found during the update metadata for upgrading"), - })?; - - let candidate_peer_id = ctx.persistent_ctx.to_peer.id; - - if leader_peer.id == candidate_peer_id { + // Check old leader matches expectation before upgrading to new leader. ensure!( - !region_route.is_leader_downgrading(), + region_route + .leader_peer + .take_if(|old_leader| old_leader.id == old_leader_peer.id) + .is_some(), error::UnexpectedSnafu { violated: format!( - "Unexpected intermediate state is found during the update metadata for upgrading region {region_id}" + "Unexpected region leader: {:?} during the candidate-to-leader upgrade; expected: {:?}", + region_route.leader_peer, old_leader_peer ), } ); - Ok(true) - } else { - Ok(false) + // Set new leader. + region_route.leader_peer = Some(new_leader_peer.clone()); + + // Remove new leader from followers (avoids duplicate leader/follower). + let removed = region_route + .follower_peers + .extract_if(.., |peer| peer.id == new_leader_peer.id) + .collect::>(); + + // Warn if more than one follower with the new leader id was present. + if removed.len() > 1 { + warn!( + "Removed duplicate followers: {removed:?} during candidate-to-leader upgrade for region: {region_id}" + ); + } } + + info!( + "Building metadata for upgrading candidate region to new leader: {:?} for regions: {:?}", + new_leader_peer, region_ids, + ); + + Ok(region_routes) + } + + /// Checks if metadata has been upgraded for a list of regions by verifying if their + /// leader peers have been switched to a specified peer ID (`to_peer_id`) and that + /// no region is in a leader downgrading state. + /// + /// Returns: + /// - `Ok(true)` if all regions' leader is the target peer and no downgrading occurs. + /// - `Ok(false)` if any region's leader is not the target peer. + /// - Error if region route or leader peer cannot be found, or an unexpected state is detected. + fn check_metadata_updated( + &self, + ctx: &mut Context, + region_ids: &[RegionId], + region_routes: &[RegionRoute], + ) -> Result { + // Iterate through each provided region ID + for region_id in region_ids { + // Find the route info for this region + let region_route = region_routes + .iter() + .find(|route| route.region.id == *region_id) + .context(error::RegionRouteNotFoundSnafu { + region_id: *region_id, + })?; + + // Get the leader peer for the region, error if not found + let leader_peer = region_route.leader_peer.as_ref().with_context(||error::UnexpectedSnafu { + violated: format!( + "The leader peer of region {region_id} is not found during the metadata upgrade check" + ), + })?; + + // If the leader is not the expected peer, return false (i.e., not yet upgraded) + if leader_peer.id != ctx.persistent_ctx.to_peer.id { + return Ok(false); + } else { + // If leader matches but region is in leader downgrading state, error (unexpected state) + ensure!( + !region_route.is_leader_downgrading(), + error::UnexpectedSnafu { + violated: format!( + "Unexpected intermediate state is found during the metadata upgrade check for region {region_id}" + ), + } + ); + } + } + + // All regions' leader match expected peer and are not downgrading; considered upgraded + Ok(true) } /// Upgrades the candidate region. @@ -133,55 +146,77 @@ impl UpdateMetadata { /// Retry: /// - Failed to update [TableRouteValue](common_meta::key::table_region::TableRegionValue). /// - Failed to retrieve the metadata of table. - pub async fn upgrade_candidate_region(&self, ctx: &mut Context) -> Result<()> { - let region_id = ctx.region_id(); + pub async fn upgrade_candidate_region( + &self, + ctx: &mut Context, + ctx_provider: &ContextProviderRef, + ) -> Result<()> { let table_metadata_manager = ctx.table_metadata_manager.clone(); + let table_regions = ctx.persistent_ctx.table_regions(); + let from_peer_id = ctx.persistent_ctx.from_peer.id; + let to_peer_id = ctx.persistent_ctx.to_peer.id; - if self.check_metadata_updated(ctx).await? { - return Ok(()); + for (table_id, region_ids) in table_regions { + let table_lock = TableLock::Write(table_id).into(); + let _guard = ctx_provider.acquire_lock(&table_lock).await; + + let table_route_value = ctx.get_table_route_value(table_id).await?; + let region_routes = table_route_value.region_routes().with_context(|_| { + error::UnexpectedLogicalRouteTableSnafu { + err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."), + } + })?; + if self.check_metadata_updated(ctx, ®ion_ids, region_routes)? { + continue; + } + let datanode_table_value = ctx.get_from_peer_datanode_table_value(table_id).await?; + let RegionInfo { + region_storage_path, + region_options, + region_wal_options, + engine, + } = datanode_table_value.region_info.clone(); + let new_region_routes = self.build_upgrade_candidate_region_metadata( + ctx, + ®ion_ids, + region_routes.clone(), + )?; + let region_distribution = region_distribution(region_routes); + info!( + "Trying to update region routes to {:?} for table: {}", + region_distribution, table_id, + ); + + if let Err(err) = table_metadata_manager + .update_table_route( + table_id, + RegionInfo { + engine: engine.clone(), + region_storage_path: region_storage_path.clone(), + region_options: region_options.clone(), + region_wal_options: region_wal_options.clone(), + }, + &table_route_value, + new_region_routes, + ®ion_options, + ®ion_wal_options, + ) + .await + .context(error::TableMetadataManagerSnafu) + { + error!(err; "Failed to update the table route during the upgrading candidate region: {region_ids:?}, from_peer_id: {from_peer_id}"); + return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { + reason: format!("Failed to update the table route during the upgrading candidate region: {table_id}"), + }); + }; + info!( + "Upgrading candidate region table route success, table_id: {table_id}, regions: {region_ids:?}, to_peer_id: {to_peer_id}" + ); } - let region_routes = self.build_upgrade_candidate_region_metadata(ctx).await?; - let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?; - let RegionInfo { - region_storage_path, - region_options, - region_wal_options, - engine, - } = datanode_table_value.region_info.clone(); - let table_route_value = ctx.get_table_route_value().await?; - - let region_distribution = region_distribution(®ion_routes); - info!( - "Trying to update region routes to {:?} for table: {}", - region_distribution, - region_id.table_id() - ); - if let Err(err) = table_metadata_manager - .update_table_route( - region_id.table_id(), - RegionInfo { - engine: engine.clone(), - region_storage_path: region_storage_path.clone(), - region_options: region_options.clone(), - region_wal_options: region_wal_options.clone(), - }, - &table_route_value, - region_routes, - ®ion_options, - ®ion_wal_options, - ) - .await - .context(error::TableMetadataManagerSnafu) - { - return Err(BoxedError::new(err)).context(error::RetryLaterWithSourceSnafu { - reason: format!("Failed to update the table route during the upgrading candidate region: {region_id}"), - }); - }; - ctx.deregister_failure_detectors().await; // Consumes the guard. - ctx.volatile_ctx.opening_region_guard.take(); + ctx.volatile_ctx.opening_region_guards.clear(); Ok(()) } @@ -210,16 +245,11 @@ mod tests { #[tokio::test] async fn test_table_route_is_not_found_error() { - let state = UpdateMetadata::Upgrade; - let env = TestingEnv::new(); let persistent_context = new_persistent_context(); - let mut ctx = env.context_factory().new_context(persistent_context); + let ctx = env.context_factory().new_context(persistent_context); - let err = state - .build_upgrade_candidate_region_metadata(&mut ctx) - .await - .unwrap_err(); + let err = ctx.get_table_route_value(1024).await.unwrap_err(); assert_matches!(err, Error::TableRouteNotFound { .. }); assert!(!err.is_retryable()); @@ -238,13 +268,20 @@ mod tests { leader_peer: Some(Peer::empty(4)), ..Default::default() }]; - env.create_physical_table_metadata(table_info, region_routes) .await; + let table_route_value = ctx.get_table_route_value(1024).await.unwrap(); + let region_routes = table_route_value + .into_inner() + .into_physical_table_route() + .region_routes; let err = state - .build_upgrade_candidate_region_metadata(&mut ctx) - .await + .build_upgrade_candidate_region_metadata( + &mut ctx, + &[RegionId::new(1024, 1)], + region_routes, + ) .unwrap_err(); assert_matches!(err, Error::RegionRouteNotFound { .. }); @@ -268,9 +305,17 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; + let table_route_value = ctx.get_table_route_value(1024).await.unwrap(); + let region_routes = table_route_value + .into_inner() + .into_physical_table_route() + .region_routes; let err = state - .build_upgrade_candidate_region_metadata(&mut ctx) - .await + .build_upgrade_candidate_region_metadata( + &mut ctx, + &[RegionId::new(1024, 1)], + region_routes, + ) .unwrap_err(); assert_matches!(err, Error::Unexpected { .. }); @@ -297,9 +342,17 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; + let table_route_value = ctx.get_table_route_value(1024).await.unwrap(); + let region_routes = table_route_value + .into_inner() + .into_physical_table_route() + .region_routes; let new_region_routes = state - .build_upgrade_candidate_region_metadata(&mut ctx) - .await + .build_upgrade_candidate_region_metadata( + &mut ctx, + &[RegionId::new(1024, 1)], + region_routes, + ) .unwrap(); assert!(!new_region_routes[0].is_leader_downgrading()); @@ -327,8 +380,11 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; - - let updated = state.check_metadata_updated(&mut ctx).await.unwrap(); + let table_routes = ctx.get_table_route_value(1024).await.unwrap(); + let region_routes = table_routes.region_routes().unwrap(); + let updated = state + .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes) + .unwrap(); assert!(!updated); } @@ -352,7 +408,11 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; - let updated = state.check_metadata_updated(&mut ctx).await.unwrap(); + let table_routes = ctx.get_table_route_value(1024).await.unwrap(); + let region_routes = table_routes.region_routes().unwrap(); + let updated = state + .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes) + .unwrap(); assert!(updated); } @@ -376,7 +436,11 @@ mod tests { env.create_physical_table_metadata(table_info, region_routes) .await; - let err = state.check_metadata_updated(&mut ctx).await.unwrap_err(); + let table_routes = ctx.get_table_route_value(1024).await.unwrap(); + let region_routes = table_routes.region_routes().unwrap(); + let err = state + .check_metadata_updated(&mut ctx, &[RegionId::new(1024, 1)], region_routes) + .unwrap_err(); assert_matches!(err, Error::Unexpected { .. }); assert!(err.to_string().contains("intermediate state")); } @@ -401,7 +465,7 @@ mod tests { let guard = opening_keeper .register(2, RegionId::new(table_id, 1)) .unwrap(); - ctx.volatile_ctx.opening_region_guard = Some(guard); + ctx.volatile_ctx.opening_region_guards.push(guard); env.create_physical_table_metadata(table_info, region_routes) .await; @@ -425,7 +489,7 @@ mod tests { .unwrap(); let region_routes = table_route.region_routes().unwrap(); - assert!(ctx.volatile_ctx.opening_region_guard.is_none()); + assert!(ctx.volatile_ctx.opening_region_guards.is_empty()); assert_eq!(region_routes.len(), 1); assert!(!region_routes[0].is_leader_downgrading()); assert!(region_routes[0].follower_peers.is_empty()); diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index 155130db41..f9000fa6a6 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::collections::HashSet; use std::time::Duration; use api::v1::meta::MailboxMessage; @@ -20,10 +21,11 @@ use common_meta::ddl::utils::parse_region_wal_options; use common_meta::instruction::{ Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply, }; +use common_meta::key::topic_region::TopicRegionKey; use common_meta::lock_key::RemoteWalLock; use common_meta::wal_options_allocator::extract_topic_from_wal_options; use common_procedure::{Context as ProcedureContext, Status}; -use common_telemetry::{error, warn}; +use common_telemetry::{error, info}; use common_wal::options::WalOptions; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; @@ -66,17 +68,9 @@ impl State for UpgradeCandidateRegion { ) -> Result<(Box, Status)> { let now = Instant::now(); - let region_wal_option = self.get_region_wal_option(ctx).await?; - let region_id = ctx.persistent_ctx.region_id; - if region_wal_option.is_none() { - warn!( - "Region {} wal options not found, during upgrade candidate region", - region_id - ); - } - + let topics = self.get_kafka_topics(ctx).await?; if self - .upgrade_region_with_retry(ctx, procedure_ctx, region_wal_option.as_ref()) + .upgrade_region_with_retry(ctx, procedure_ctx, topics) .await { ctx.update_upgrade_candidate_region_elapsed(now); @@ -93,24 +87,32 @@ impl State for UpgradeCandidateRegion { } impl UpgradeCandidateRegion { - async fn get_region_wal_option(&self, ctx: &mut Context) -> Result> { - let region_id = ctx.persistent_ctx.region_id; - match ctx.get_from_peer_datanode_table_value().await { - Ok(datanode_table_value) => { - let region_wal_options = - parse_region_wal_options(&datanode_table_value.region_info.region_wal_options) - .context(error::ParseWalOptionsSnafu)?; - Ok(region_wal_options.get(®ion_id.region_number()).cloned()) + async fn get_kafka_topics(&self, ctx: &mut Context) -> Result> { + let table_regions = ctx.persistent_ctx.table_regions(); + let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?; + let mut topics = HashSet::new(); + for (table_id, regions) in table_regions { + let Some(datanode_table_value) = datanode_table_values.get(&table_id) else { + continue; + }; + + let region_wal_options = + parse_region_wal_options(&datanode_table_value.region_info.region_wal_options) + .context(error::ParseWalOptionsSnafu)?; + + for region_id in regions { + let Some(WalOptions::Kafka(kafka_wal_options)) = + region_wal_options.get(®ion_id.region_number()) + else { + continue; + }; + if !topics.contains(&kafka_wal_options.topic) { + topics.insert(kafka_wal_options.topic.clone()); + } } - Err(error::Error::DatanodeTableNotFound { datanode_id, .. }) => { - warn!( - "Datanode table not found, during upgrade candidate region, the target region might already been migrated, region_id: {}, datanode_id: {}", - region_id, datanode_id - ); - Ok(None) - } - Err(e) => Err(e), } + + Ok(topics) } /// Builds upgrade region instruction. @@ -119,35 +121,105 @@ impl UpgradeCandidateRegion { ctx: &mut Context, replay_timeout: Duration, ) -> Result { - let pc = &ctx.persistent_ctx; - let region_id = pc.region_id; - let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id; - let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id; - // Try our best to retrieve replay checkpoint. - let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok(); - let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| { - extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options) - }) { - ctx.fetch_replay_checkpoint(&topic).await.ok().flatten() - } else { - None - }; + let region_ids = ctx.persistent_ctx.region_ids.clone(); + let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?; + let mut region_topic = Vec::with_capacity(region_ids.len()); + for region_id in region_ids.iter() { + let table_id = region_id.table_id(); + if let Some(datanode_table_value) = datanode_table_values.get(&table_id) + && let Some(topic) = extract_topic_from_wal_options( + *region_id, + &datanode_table_value.region_info.region_wal_options, + ) + { + region_topic.push((*region_id, topic)); + } + } - let upgrade_instruction = Instruction::UpgradeRegions(vec![ - UpgradeRegion { + let replay_checkpoints = ctx + .get_replay_checkpoints( + region_topic + .iter() + .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic)) + .collect(), + ) + .await?; + // Build upgrade regions instruction. + let mut upgrade_regions = Vec::with_capacity(region_ids.len()); + for region_id in region_ids { + let last_entry_id = ctx + .volatile_ctx + .leader_region_last_entry_ids + .get(®ion_id) + .copied(); + let metadata_last_entry_id = ctx + .volatile_ctx + .leader_region_metadata_last_entry_ids + .get(®ion_id) + .copied(); + let checkpoint = replay_checkpoints.get(®ion_id).copied(); + upgrade_regions.push(UpgradeRegion { region_id, last_entry_id, metadata_last_entry_id, replay_timeout, location_id: Some(ctx.persistent_ctx.from_peer.id), - replay_entry_id: None, - metadata_replay_entry_id: None, - } - .with_replay_entry_id(checkpoint.map(|c| c.entry_id)) - .with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)), - ]); + replay_entry_id: checkpoint.map(|c| c.entry_id), + metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id), + }); + } - Ok(upgrade_instruction) + Ok(Instruction::UpgradeRegions(upgrade_regions)) + } + + fn handle_upgrade_region_reply( + &self, + ctx: &mut Context, + UpgradeRegionReply { + region_id, + ready, + exists, + error, + }: &UpgradeRegionReply, + now: &Instant, + ) -> Result<()> { + let candidate = &ctx.persistent_ctx.to_peer; + if error.is_some() { + return error::RetryLaterSnafu { + reason: format!( + "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}", + region_id, + candidate, + error, + now.elapsed() + ), + } + .fail(); + } + + ensure!( + exists, + error::UnexpectedSnafu { + violated: format!( + "Candidate region {} doesn't exist on datanode {:?}", + region_id, candidate + ) + } + ); + + if self.require_ready && !ready { + return error::RetryLaterSnafu { + reason: format!( + "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}", + region_id, + candidate, + now.elapsed() + ), + } + .fail(); + } + + Ok(()) } /// Tries to upgrade a candidate region. @@ -175,11 +247,11 @@ impl UpgradeCandidateRegion { .await?; let pc = &ctx.persistent_ctx; - let region_id = pc.region_id; + let region_ids = &pc.region_ids; let candidate = &pc.to_peer; let msg = MailboxMessage::json_message( - &format!("Upgrade candidate region: {}", region_id), + &format!("Upgrade candidate regions: {:?}", region_ids), &format!("Metasrv@{}", ctx.server_addr()), &format!("Datanode-{}@{}", candidate.id, candidate.addr), common_time::util::current_time_millis(), @@ -192,9 +264,16 @@ impl UpgradeCandidateRegion { let ch = Channel::Datanode(candidate.id); let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?; + let now = Instant::now(); match receiver.await { Ok(msg) => { let reply = HeartbeatMailbox::json_reply(&msg)?; + info!( + "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}", + reply, + region_ids, + now.elapsed() + ); let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply else { return error::UnexpectedInstructionReplySnafu { @@ -203,51 +282,16 @@ impl UpgradeCandidateRegion { } .fail(); }; - // TODO(weny): handle multiple replies. - let UpgradeRegionReply { - ready, - exists, - error, - .. - } = &replies[0]; - - // Notes: The order of handling is important. - if error.is_some() { - return error::RetryLaterSnafu { - reason: format!( - "Failed to upgrade the region {} on datanode {:?}, error: {:?}", - region_id, candidate, error - ), - } - .fail(); + for reply in replies { + self.handle_upgrade_region_reply(ctx, &reply, &now)?; } - - ensure!( - exists, - error::UnexpectedSnafu { - violated: format!( - "Candidate region {} doesn't exist on datanode {:?}", - region_id, candidate - ) - } - ); - - if self.require_ready && !ready { - return error::RetryLaterSnafu { - reason: format!( - "Candidate region {} still replaying the wal on datanode {:?}", - region_id, candidate - ), - } - .fail(); - } - Ok(()) } Err(error::Error::MailboxTimeout { .. }) => { let reason = format!( - "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}", + "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}", candidate, + now.elapsed() ); error::RetryLaterSnafu { reason }.fail() } @@ -262,26 +306,24 @@ impl UpgradeCandidateRegion { &self, ctx: &mut Context, procedure_ctx: &ProcedureContext, - wal_options: Option<&WalOptions>, + topics: HashSet, ) -> bool { let mut retry = 0; let mut upgraded = false; + let mut guards = Vec::with_capacity(topics.len()); loop { let timer = Instant::now(); // If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade. - let _guard = if let Some(WalOptions::Kafka(kafka_wal_options)) = wal_options { - Some( + for topic in &topics { + guards.push( procedure_ctx .provider - .acquire_lock( - &(RemoteWalLock::Read(kafka_wal_options.topic.clone()).into()), - ) + .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into())) .await, - ) - } else { - None - }; + ); + } + if let Err(err) = self.upgrade_region(ctx).await { retry += 1; ctx.update_operations_elapsed(timer); @@ -332,17 +374,17 @@ mod tests { schema: "public".into(), from_peer: Peer::empty(1), to_peer: Peer::empty(2), - region_id: RegionId::new(1024, 1), + region_ids: vec![RegionId::new(1024, 1)], timeout: Duration::from_millis(1000), trigger_reason: RegionMigrationTriggerReason::Manual, } } async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { - let table_info = - new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into(); + let region_id = ctx.persistent_ctx.region_ids[0]; + let table_info = new_test_table_info(region_id.table_id(), vec![1]).into(); let region_routes = vec![RegionRoute { - region: Region::new_test(ctx.persistent_ctx.region_id), + region: Region::new_test(region_id), leader_peer: Some(ctx.persistent_ctx.from_peer.clone()), follower_peers: vec![ctx.persistent_ctx.to_peer.clone()], ..Default::default()