From 69865c831d1578565580b8de14c58ab9ffe4cbb4 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 26 Nov 2025 16:31:56 +0800 Subject: [PATCH] feat: batch region migration for failover (#7245) * refactor: support multiple rows per event in event recorder Signed-off-by: WenyXu * feat: batch region migration for failover Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * test: add tests Signed-off-by: WenyXu * chore: apply suggestions from CR * chore: apply suggestions from CR Signed-off-by: WenyXu * fix: fix unit tests Signed-off-by: WenyXu * chore: apply suggestions from CR --------- Signed-off-by: WenyXu --- src/common/event-recorder/src/recorder.rs | 26 +- src/common/frontend/src/slow_query_event.rs | 6 +- src/common/procedure/src/event.rs | 103 +++- .../src/events/region_migration_event.rs | 71 +-- .../src/procedure/region_migration.rs | 65 ++- .../downgrade_leader_region.rs | 17 +- .../src/procedure/region_migration/manager.rs | 354 ++++++++++++- .../procedure/region_migration/test_util.rs | 17 +- .../upgrade_candidate_region.rs | 17 +- .../src/procedure/region_migration/utils.rs | 487 ++++++++++++++++++ src/meta-srv/src/region/supervisor.rs | 326 ++++++++++-- 11 files changed, 1331 insertions(+), 158 deletions(-) create mode 100644 src/meta-srv/src/procedure/region_migration/utils.rs diff --git a/src/common/event-recorder/src/recorder.rs b/src/common/event-recorder/src/recorder.rs index ddf0bcdae0..ace7702991 100644 --- a/src/common/event-recorder/src/recorder.rs +++ b/src/common/event-recorder/src/recorder.rs @@ -97,9 +97,9 @@ pub trait Event: Send + Sync + Debug { vec![] } - /// Add the extra row to the event with the default row. - fn extra_row(&self) -> Result { - Ok(Row { values: vec![] }) + /// Add the extra rows to the event with the default row. + fn extra_rows(&self) -> Result> { + Ok(vec![Row { values: vec![] }]) } /// Returns the event as any type. @@ -159,15 +159,17 @@ pub fn build_row_inserts_request(events: &[&Box]) -> Result = Vec::with_capacity(events.len()); for event in events { - let extra_row = event.extra_row()?; - let mut values = Vec::with_capacity(3 + extra_row.values.len()); - values.extend([ - ValueData::StringValue(event.event_type().to_string()).into(), - ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(), - ValueData::TimestampNanosecondValue(event.timestamp().value()).into(), - ]); - values.extend(extra_row.values); - rows.push(Row { values }); + let extra_rows = event.extra_rows()?; + for extra_row in extra_rows { + let mut values = Vec::with_capacity(3 + extra_row.values.len()); + values.extend([ + ValueData::StringValue(event.event_type().to_string()).into(), + ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(), + ValueData::TimestampNanosecondValue(event.timestamp().value()).into(), + ]); + values.extend(extra_row.values); + rows.push(Row { values }); + } } Ok(RowInsertRequests { diff --git a/src/common/frontend/src/slow_query_event.rs b/src/common/frontend/src/slow_query_event.rs index 0e65443acb..32ca457da4 100644 --- a/src/common/frontend/src/slow_query_event.rs +++ b/src/common/frontend/src/slow_query_event.rs @@ -107,8 +107,8 @@ impl Event for SlowQueryEvent { ] } - fn extra_row(&self) -> Result { - Ok(Row { + fn extra_rows(&self) -> Result> { + Ok(vec![Row { values: vec![ ValueData::U64Value(self.cost).into(), ValueData::U64Value(self.threshold).into(), @@ -119,7 +119,7 @@ impl Event for SlowQueryEvent { ValueData::TimestampMillisecondValue(self.promql_start.unwrap_or(0)).into(), ValueData::TimestampMillisecondValue(self.promql_end.unwrap_or(0)).into(), ], - }) + }]) } fn json_payload(&self) -> Result { diff --git a/src/common/procedure/src/event.rs b/src/common/procedure/src/event.rs index bc76de7842..d659236369 100644 --- a/src/common/procedure/src/event.rs +++ b/src/common/procedure/src/event.rs @@ -92,25 +92,96 @@ impl Event for ProcedureEvent { schema } - fn extra_row(&self) -> Result { - let error_str = match &self.state { - ProcedureState::Failed { error } => format!("{:?}", error), - ProcedureState::PrepareRollback { error } => format!("{:?}", error), - ProcedureState::RollingBack { error } => format!("{:?}", error), - ProcedureState::Retrying { error } => format!("{:?}", error), - ProcedureState::Poisoned { error, .. } => format!("{:?}", error), - _ => "".to_string(), - }; - let mut row = vec![ - ValueData::StringValue(self.procedure_id.to_string()).into(), - ValueData::StringValue(self.state.as_str_name().to_string()).into(), - ValueData::StringValue(error_str).into(), - ]; - row.append(&mut self.internal_event.extra_row()?.values); - Ok(Row { values: row }) + fn extra_rows(&self) -> Result> { + let mut internal_event_extra_rows = self.internal_event.extra_rows()?; + let mut rows = Vec::with_capacity(internal_event_extra_rows.len()); + for internal_event_extra_row in internal_event_extra_rows.iter_mut() { + let error_str = match &self.state { + ProcedureState::Failed { error } => format!("{:?}", error), + ProcedureState::PrepareRollback { error } => format!("{:?}", error), + ProcedureState::RollingBack { error } => format!("{:?}", error), + ProcedureState::Retrying { error } => format!("{:?}", error), + ProcedureState::Poisoned { error, .. } => format!("{:?}", error), + _ => "".to_string(), + }; + let mut values = Vec::with_capacity(3 + internal_event_extra_row.values.len()); + values.extend([ + ValueData::StringValue(self.procedure_id.to_string()).into(), + ValueData::StringValue(self.state.as_str_name().to_string()).into(), + ValueData::StringValue(error_str).into(), + ]); + values.append(&mut internal_event_extra_row.values); + rows.push(Row { values }); + } + + Ok(rows) } fn as_any(&self) -> &dyn Any { self } } + +#[cfg(test)] +mod tests { + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType}; + use common_event_recorder::Event; + + use crate::{ProcedureEvent, ProcedureId, ProcedureState}; + + #[derive(Debug)] + struct TestEvent; + + impl Event for TestEvent { + fn event_type(&self) -> &str { + "test_event" + } + + fn extra_schema(&self) -> Vec { + vec![ColumnSchema { + column_name: "test_event_column".to_string(), + datatype: ColumnDataType::String.into(), + semantic_type: SemanticType::Field.into(), + ..Default::default() + }] + } + + fn extra_rows(&self) -> common_event_recorder::error::Result> { + Ok(vec![ + Row { + values: vec![ValueData::StringValue("test_event1".to_string()).into()], + }, + Row { + values: vec![ValueData::StringValue("test_event2".to_string()).into()], + }, + ]) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + } + + #[test] + fn test_procedure_event_extra_rows() { + let procedure_event = ProcedureEvent::new( + ProcedureId::random(), + Box::new(TestEvent {}), + ProcedureState::Running, + ); + + let procedure_event_extra_rows = procedure_event.extra_rows().unwrap(); + assert_eq!(procedure_event_extra_rows.len(), 2); + assert_eq!(procedure_event_extra_rows[0].values.len(), 4); + assert_eq!( + procedure_event_extra_rows[0].values[3], + ValueData::StringValue("test_event1".to_string()).into() + ); + assert_eq!(procedure_event_extra_rows[1].values.len(), 4); + assert_eq!( + procedure_event_extra_rows[1].values[3], + ValueData::StringValue("test_event2".to_string()).into() + ); + } +} diff --git a/src/meta-srv/src/events/region_migration_event.rs b/src/meta-srv/src/events/region_migration_event.rs index e5b81d5585..7e5c5b6fc2 100644 --- a/src/meta-srv/src/events/region_migration_event.rs +++ b/src/meta-srv/src/events/region_migration_event.rs @@ -21,7 +21,7 @@ use common_event_recorder::Event; use common_event_recorder::error::{Result, SerializeEventSnafu}; use serde::Serialize; use snafu::ResultExt; -use store_api::storage::{RegionId, TableId}; +use store_api::storage::RegionId; use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason}; @@ -37,37 +37,34 @@ pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_nod pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr"; /// RegionMigrationEvent is the event of region migration. -#[derive(Debug, Serialize)] +#[derive(Debug)] pub(crate) struct RegionMigrationEvent { - #[serde(skip)] - region_id: RegionId, - #[serde(skip)] - table_id: TableId, - #[serde(skip)] - region_number: u32, - #[serde(skip)] + // The region ids of the region migration. + region_ids: Vec, + // The trigger reason of the region migration. trigger_reason: RegionMigrationTriggerReason, - #[serde(skip)] + // The source node id of the region migration. src_node_id: u64, - #[serde(skip)] + // The source peer address of the region migration. src_peer_addr: String, - #[serde(skip)] + // The destination node id of the region migration. dst_node_id: u64, - #[serde(skip)] + // The destination peer address of the region migration. dst_peer_addr: String, + // The timeout of the region migration. + timeout: Duration, +} - // The following fields will be serialized as the json payload. +#[derive(Debug, Serialize)] +struct Payload { + #[serde(with = "humantime_serde")] timeout: Duration, } impl RegionMigrationEvent { pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self { - // FIXME(weny): handle multiple region ids. - let region_id = ctx.region_ids[0]; Self { - region_id, - table_id: region_id.table_id(), - region_number: region_id.region_number(), + region_ids: ctx.region_ids.clone(), trigger_reason: ctx.trigger_reason, src_node_id: ctx.from_peer.id, src_peer_addr: ctx.from_peer.addr.clone(), @@ -136,23 +133,31 @@ impl Event for RegionMigrationEvent { ] } - fn extra_row(&self) -> Result { - Ok(Row { - values: vec![ - ValueData::U64Value(self.region_id.as_u64()).into(), - ValueData::U32Value(self.table_id).into(), - ValueData::U32Value(self.region_number).into(), - ValueData::StringValue(self.trigger_reason.to_string()).into(), - ValueData::U64Value(self.src_node_id).into(), - ValueData::StringValue(self.src_peer_addr.clone()).into(), - ValueData::U64Value(self.dst_node_id).into(), - ValueData::StringValue(self.dst_peer_addr.clone()).into(), - ], - }) + fn extra_rows(&self) -> Result> { + let mut extra_rows = Vec::with_capacity(self.region_ids.len()); + for region_id in &self.region_ids { + extra_rows.push(Row { + values: vec![ + ValueData::U64Value(region_id.as_u64()).into(), + ValueData::U32Value(region_id.table_id()).into(), + ValueData::U32Value(region_id.region_number()).into(), + ValueData::StringValue(self.trigger_reason.to_string()).into(), + ValueData::U64Value(self.src_node_id).into(), + ValueData::StringValue(self.src_peer_addr.clone()).into(), + ValueData::U64Value(self.dst_node_id).into(), + ValueData::StringValue(self.dst_peer_addr.clone()).into(), + ], + }); + } + + Ok(extra_rows) } fn json_payload(&self) -> Result { - serde_json::to_string(self).context(SerializeEventSnafu) + serde_json::to_string(&Payload { + timeout: self.timeout, + }) + .context(SerializeEventSnafu) } fn as_any(&self) -> &dyn Any { diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 5f710adcd3..3613fd0894 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -24,6 +24,7 @@ pub(crate) mod open_candidate_region; pub mod test_util; pub(crate) mod update_metadata; pub(crate) mod upgrade_candidate_region; +pub(crate) mod utils; use std::any::Any; use std::collections::{HashMap, HashSet}; @@ -100,9 +101,16 @@ where #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct PersistentContext { /// The table catalog. - pub(crate) catalog: String, + #[deprecated(note = "use `catalog_and_schema` instead")] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) catalog: Option, /// The table schema. - pub(crate) schema: String, + #[deprecated(note = "use `catalog_and_schema` instead")] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(crate) schema: Option, + /// The catalog and schema of the regions. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub(crate) catalog_and_schema: Vec<(String, String)>, /// The [Peer] of migration source. pub(crate) from_peer: Peer, /// The [Peer] of migration destination. @@ -118,15 +126,47 @@ pub struct PersistentContext { pub(crate) trigger_reason: RegionMigrationTriggerReason, } +impl PersistentContext { + pub fn new( + catalog_and_schema: Vec<(String, String)>, + from_peer: Peer, + to_peer: Peer, + region_ids: Vec, + timeout: Duration, + trigger_reason: RegionMigrationTriggerReason, + ) -> Self { + #[allow(deprecated)] + Self { + catalog: None, + schema: None, + catalog_and_schema, + from_peer, + to_peer, + region_ids, + timeout, + trigger_reason, + } + } +} + fn default_timeout() -> Duration { Duration::from_secs(10) } impl PersistentContext { pub fn lock_key(&self) -> Vec { - let mut lock_keys = Vec::with_capacity(self.region_ids.len() + 2); - lock_keys.push(CatalogLock::Read(&self.catalog).into()); - lock_keys.push(SchemaLock::read(&self.catalog, &self.schema).into()); + let mut lock_keys = + Vec::with_capacity(self.region_ids.len() + 2 + self.catalog_and_schema.len() * 2); + #[allow(deprecated)] + if let (Some(catalog), Some(schema)) = (&self.catalog, &self.schema) { + lock_keys.push(CatalogLock::Read(catalog).into()); + lock_keys.push(SchemaLock::read(catalog, schema).into()); + } + for (catalog, schema) in self.catalog_and_schema.iter() { + lock_keys.push(CatalogLock::Read(catalog).into()); + lock_keys.push(SchemaLock::read(catalog, schema).into()); + } + // Sort the region ids to ensure the same order of region ids. let mut region_ids = self.region_ids.clone(); region_ids.sort_unstable(); @@ -928,13 +968,24 @@ mod tests { let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]); let serialized = procedure.dump().unwrap(); - let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#; + let expected = r#"{"persistent_ctx":{"catalog_and_schema":[["greptime","public"]],"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#; assert_eq!(expected, serialized); } #[test] fn test_backward_compatibility() { - let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1)); + let persistent_ctx = PersistentContext { + #[allow(deprecated)] + catalog: Some("greptime".into()), + #[allow(deprecated)] + schema: Some("public".into()), + catalog_and_schema: vec![], + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + region_ids: vec![RegionId::new(1024, 1)], + timeout: Duration::from_secs(10), + trigger_reason: RegionMigrationTriggerReason::default(), + }; // NOTES: Changes it will break backward compatibility. let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#; let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs index 5cc8f064fb..d10220098f 100644 --- a/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/downgrade_leader_region.rs @@ -389,15 +389,14 @@ mod tests { }; fn new_persistent_context() -> PersistentContext { - PersistentContext { - catalog: "greptime".into(), - schema: "public".into(), - from_peer: Peer::empty(1), - to_peer: Peer::empty(2), - region_ids: vec![RegionId::new(1024, 1)], - timeout: Duration::from_millis(1000), - trigger_reason: RegionMigrationTriggerReason::Manual, - } + PersistentContext::new( + vec![("greptime".into(), "public".into())], + Peer::empty(1), + Peer::empty(2), + vec![RegionId::new(1024, 1)], + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + ) } async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 782d2fe7ec..fcd8f7a6e6 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -12,8 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -31,6 +31,9 @@ use table::table_name::TableName; use crate::error::{self, Result}; use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL}; +use crate::procedure::region_migration::utils::{ + RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task, +}; use crate::procedure::region_migration::{ DefaultContextFactory, PersistentContext, RegionMigrationProcedure, }; @@ -99,6 +102,7 @@ impl Drop for RegionMigrationProcedureGuard { } } +/// A task of region migration procedure. #[derive(Debug, Clone)] pub struct RegionMigrationProcedureTask { pub(crate) region_id: RegionId, @@ -151,6 +155,25 @@ impl Display for RegionMigrationProcedureTask { } } +/// The result of submitting a region migration task. +#[derive(Debug, Default, PartialEq, Eq)] +pub struct SubmitRegionMigrationTaskResult { + /// Regions already migrated to the `to_peer`. + pub migrated: Vec, + /// Regions where the leader peer has changed. + pub leader_changed: Vec, + /// Regions where `to_peer` is already a follower (conflict). + pub peer_conflict: Vec, + /// Regions whose table is not found. + pub table_not_found: Vec, + /// Regions still pending migration. + pub migrating: Vec, + /// Regions that have been submitted for migration. + pub submitted: Vec, + /// The procedure id of the region migration procedure. + pub procedure_id: Option, +} + impl RegionMigrationManager { /// Returns new [`RegionMigrationManager`] pub(crate) fn new( @@ -332,6 +355,168 @@ impl RegionMigrationManager { Ok(()) } + /// Extracts regions from the migration task that are already running migration procedures. + /// + /// Returns a tuple containing those region ids that are already running and the newly created procedure guards. + /// The regions that are already running will be removed from the [`RegionMigrationTask`]. + fn extract_running_regions( + &self, + task: &mut RegionMigrationTaskBatch, + ) -> (Vec, Vec) { + let mut migrating_region_ids = Vec::new(); + let mut procedure_guards = Vec::with_capacity(task.region_ids.len()); + + for region_id in &task.region_ids { + let Some(guard) = self.insert_running_procedure(&RegionMigrationProcedureTask::new( + *region_id, + task.from_peer.clone(), + task.to_peer.clone(), + task.timeout, + task.trigger_reason, + )) else { + migrating_region_ids.push(*region_id); + continue; + }; + procedure_guards.push(guard); + } + + let migrating_set = migrating_region_ids.iter().cloned().collect::>(); + task.region_ids.retain(|id| !migrating_set.contains(id)); + + (migrating_region_ids, procedure_guards) + } + + pub async fn submit_region_migration_task( + &self, + mut task: RegionMigrationTaskBatch, + ) -> Result { + let (migrating_region_ids, procedure_guards) = self.extract_running_regions(&mut task); + let RegionMigrationAnalysis { + migrated, + leader_changed, + peer_conflict, + mut table_not_found, + pending, + } = analyze_region_migration_task(&task, &self.context_factory.table_metadata_manager) + .await?; + if pending.is_empty() { + return Ok(SubmitRegionMigrationTaskResult { + migrated, + leader_changed, + peer_conflict, + table_not_found, + migrating: migrating_region_ids, + submitted: vec![], + procedure_id: None, + }); + } + + // Updates the region ids to the pending region ids. + task.region_ids = pending; + let table_regions = task.table_regions(); + let table_ids = table_regions.keys().cloned().collect::>(); + let table_info_values = self + .context_factory + .table_metadata_manager + .table_info_manager() + .batch_get(&table_ids) + .await + .context(error::TableMetadataManagerSnafu)?; + let mut catalog_and_schema = Vec::with_capacity(table_info_values.len()); + for (table_id, regions) in table_regions { + match table_info_values.get(&table_id) { + Some(table_info) => { + let TableName { + catalog_name, + schema_name, + .. + } = table_info.table_name(); + catalog_and_schema.push((catalog_name, schema_name)); + } + None => { + task.region_ids.retain(|id| id.table_id() != table_id); + table_not_found.extend(regions); + } + } + } + if task.region_ids.is_empty() { + return Ok(SubmitRegionMigrationTaskResult { + migrated, + leader_changed, + peer_conflict, + table_not_found, + migrating: migrating_region_ids, + submitted: vec![], + procedure_id: None, + }); + } + + let submitting_region_ids = task.region_ids.clone(); + let procedure_id = self + .submit_procedure_inner(task, procedure_guards, catalog_and_schema) + .await?; + Ok(SubmitRegionMigrationTaskResult { + migrated, + leader_changed, + peer_conflict, + table_not_found, + migrating: migrating_region_ids, + submitted: submitting_region_ids, + procedure_id: Some(procedure_id), + }) + } + + async fn submit_procedure_inner( + &self, + task: RegionMigrationTaskBatch, + procedure_guards: Vec, + catalog_and_schema: Vec<(String, String)>, + ) -> Result { + let procedure = RegionMigrationProcedure::new( + PersistentContext::new( + catalog_and_schema, + task.from_peer.clone(), + task.to_peer.clone(), + task.region_ids.clone(), + task.timeout, + task.trigger_reason, + ), + self.context_factory.clone(), + procedure_guards, + ); + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + let procedure_id = procedure_with_id.id; + info!("Starting region migration procedure {procedure_id} for {task}"); + let procedure_manager = self.procedure_manager.clone(); + let num_region = task.region_ids.len(); + + common_runtime::spawn_global(async move { + let watcher = &mut match procedure_manager.submit(procedure_with_id).await { + Ok(watcher) => watcher, + Err(e) => { + error!(e; "Failed to submit region migration procedure {procedure_id} for {task}"); + return; + } + }; + METRIC_META_REGION_MIGRATION_DATANODES + .with_label_values(&["src", &task.from_peer.id.to_string()]) + .inc_by(num_region as u64); + METRIC_META_REGION_MIGRATION_DATANODES + .with_label_values(&["desc", &task.to_peer.id.to_string()]) + .inc_by(num_region as u64); + + if let Err(e) = watcher::wait(watcher).await { + error!(e; "Failed to wait region migration procedure {procedure_id} for {task}"); + METRIC_META_REGION_MIGRATION_FAIL.inc(); + return; + } + + info!("Region migration procedure {procedure_id} for {task} is finished successfully!"); + }); + + Ok(procedure_id) + } + /// Submits a new region migration procedure. pub async fn submit_procedure( &self, @@ -384,15 +569,14 @@ impl RegionMigrationManager { trigger_reason, } = task.clone(); let procedure = RegionMigrationProcedure::new( - PersistentContext { - catalog: catalog_name, - schema: schema_name, - region_ids: vec![region_id], + PersistentContext::new( + vec![(catalog_name, schema_name)], from_peer, to_peer, + vec![region_id], timeout, trigger_reason, - }, + ), self.context_factory.clone(), vec![guard], ); @@ -645,4 +829,162 @@ mod test { assert_matches!(err, error::Error::Unexpected { .. }); } + + #[tokio::test] + async fn test_submit_procedure_with_multiple_regions_invalid_task() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let task = RegionMigrationTaskBatch { + region_ids: vec![RegionId::new(1024, 1)], + from_peer: Peer::empty(1), + to_peer: Peer::empty(1), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + + let err = manager + .submit_region_migration_task(task) + .await + .unwrap_err(); + assert_matches!(err, error::Error::InvalidArguments { .. }); + } + + #[tokio::test] + async fn test_submit_procedure_with_multiple_regions_no_region_to_migrate() { + common_telemetry::init_default_ut_logging(); + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationTaskBatch { + region_ids: vec![region_id], + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(2)), + ..Default::default() + }]; + env.create_physical_table_metadata(table_info, region_routes) + .await; + let result = manager.submit_region_migration_task(task).await.unwrap(); + + assert_eq!( + result, + SubmitRegionMigrationTaskResult { + migrated: vec![region_id], + ..Default::default() + } + ); + } + + #[tokio::test] + async fn test_submit_procedure_with_multiple_regions_leader_peer_changed() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationTaskBatch { + region_ids: vec![region_id], + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(3)), + ..Default::default() + }]; + + env.create_physical_table_metadata(table_info, region_routes) + .await; + let result = manager.submit_region_migration_task(task).await.unwrap(); + assert_eq!( + result, + SubmitRegionMigrationTaskResult { + leader_changed: vec![region_id], + ..Default::default() + } + ); + } + + #[tokio::test] + async fn test_submit_procedure_with_multiple_regions_peer_conflict() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationTaskBatch { + region_ids: vec![region_id], + from_peer: Peer::empty(3), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![Peer::empty(2)], + ..Default::default() + }]; + + env.create_physical_table_metadata(table_info, region_routes) + .await; + let result = manager.submit_region_migration_task(task).await.unwrap(); + assert_eq!( + result, + SubmitRegionMigrationTaskResult { + peer_conflict: vec![region_id], + ..Default::default() + } + ); + } + + #[tokio::test] + async fn test_running_regions() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationTaskBatch { + region_ids: vec![region_id, RegionId::new(1024, 2)], + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + // Inserts one + manager.tracker.running_procedures.write().unwrap().insert( + region_id, + RegionMigrationProcedureTask::new( + region_id, + task.from_peer.clone(), + task.to_peer.clone(), + task.timeout, + task.trigger_reason, + ), + ); + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(1)), + ..Default::default() + }]; + env.create_physical_table_metadata(table_info, region_routes) + .await; + let result = manager.submit_region_migration_task(task).await.unwrap(); + assert_eq!(result.migrating, vec![region_id]); + assert_eq!(result.submitted, vec![RegionId::new(1024, 2)]); + assert!(result.procedure_id.is_some()); + } } diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 375d17af89..c039fc441d 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -185,15 +185,14 @@ impl TestingEnv { /// Generates a [PersistentContext]. pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext { - PersistentContext { - catalog: "greptime".into(), - schema: "public".into(), - from_peer: Peer::empty(from), - to_peer: Peer::empty(to), - region_ids: vec![region_id], - timeout: Duration::from_secs(10), - trigger_reason: RegionMigrationTriggerReason::default(), - } + PersistentContext::new( + vec![("greptime".into(), "public".into())], + Peer::empty(from), + Peer::empty(to), + vec![region_id], + Duration::from_secs(10), + RegionMigrationTriggerReason::default(), + ) } /// The test suite for region migration procedure. diff --git a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs index f9000fa6a6..0390ddf0da 100644 --- a/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/upgrade_candidate_region.rs @@ -369,15 +369,14 @@ mod tests { }; fn new_persistent_context() -> PersistentContext { - PersistentContext { - catalog: "greptime".into(), - schema: "public".into(), - from_peer: Peer::empty(1), - to_peer: Peer::empty(2), - region_ids: vec![RegionId::new(1024, 1)], - timeout: Duration::from_millis(1000), - trigger_reason: RegionMigrationTriggerReason::Manual, - } + PersistentContext::new( + vec![("greptime".into(), "public".into())], + Peer::empty(1), + Peer::empty(2), + vec![RegionId::new(1024, 1)], + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + ) } async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap) { diff --git a/src/meta-srv/src/procedure/region_migration/utils.rs b/src/meta-srv/src/procedure/region_migration/utils.rs new file mode 100644 index 0000000000..09921ee0d6 --- /dev/null +++ b/src/meta-srv/src/procedure/region_migration/utils.rs @@ -0,0 +1,487 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::Display; +use std::time::Duration; + +use common_meta::key::TableMetadataManagerRef; +use common_meta::peer::Peer; +use common_meta::rpc::router::RegionRoute; +use itertools::Itertools; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::{RegionId, TableId}; + +use crate::error::{self, Result}; +use crate::procedure::region_migration::{ + DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, RegionMigrationTriggerReason, +}; + +/// A migration task describing how regions are intended to move between peers. +#[derive(Debug, Clone)] +pub struct RegionMigrationTaskBatch { + /// Region ids involved in this migration. + pub region_ids: Vec, + /// Source peer where regions currently reside. + pub from_peer: Peer, + /// Destination peer to migrate regions to. + pub to_peer: Peer, + /// Timeout for migration. + pub timeout: Duration, + /// Reason why this migration was triggered. + pub trigger_reason: RegionMigrationTriggerReason, +} + +impl RegionMigrationTaskBatch { + /// Constructs a [`RegionMigrationTaskBatch`] from a vector of region migration procedure tasks. + /// + /// Aggregates region IDs, determines source and destination peers, sets an appropriate timeout, + /// and assigns the trigger reason for the migration batch. + /// + /// # Panic + /// if the `tasks` are empty. + pub fn from_tasks(tasks: Vec<(RegionMigrationProcedureTask, u32)>) -> Self { + let max_count = tasks.iter().map(|(_, count)| *count).max().unwrap_or(1); + let region_ids = tasks.iter().map(|(r, _)| r.region_id).collect::>(); + let from_peer = tasks[0].0.from_peer.clone(); + let to_peer = tasks[0].0.to_peer.clone(); + let timeout = DEFAULT_REGION_MIGRATION_TIMEOUT * max_count; + let trigger_reason = RegionMigrationTriggerReason::Failover; + Self { + region_ids, + from_peer, + to_peer, + timeout, + trigger_reason, + } + } +} + +impl Display for RegionMigrationTaskBatch { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RegionMigrationTask {{ region_ids: {:?}, from_peer: {:?}, to_peer: {:?}, timeout: {:?}, trigger_reason: {:?} }}", + self.region_ids, self.from_peer, self.to_peer, self.timeout, self.trigger_reason + ) + } +} + +impl RegionMigrationTaskBatch { + /// Returns the table regions map. + /// + /// The key is the table id, the value is the region ids of the table. + pub(crate) fn table_regions(&self) -> HashMap> { + let mut table_regions = HashMap::new(); + for region_id in &self.region_ids { + table_regions + .entry(region_id.table_id()) + .or_insert_with(Vec::new) + .push(*region_id); + } + table_regions + } +} + +/// Represents the result of analyzing a migration task. +#[derive(Debug, Clone, Default, PartialEq)] +pub(crate) struct RegionMigrationAnalysis { + /// Regions already migrated to the `to_peer`. + pub(crate) migrated: Vec, + /// Regions where the leader peer has changed. + pub(crate) leader_changed: Vec, + /// Regions where `to_peer` is already a follower (conflict). + pub(crate) peer_conflict: Vec, + /// Regions whose table is not found. + pub(crate) table_not_found: Vec, + /// Regions still pending migration. + pub(crate) pending: Vec, +} + +fn leader_peer(region_route: &RegionRoute) -> Result<&Peer> { + region_route + .leader_peer + .as_ref() + .with_context(|| error::UnexpectedSnafu { + violated: format!( + "Region route leader peer is not found in region({})", + region_route.region.id + ), + }) +} + +/// Returns true if the region has already been migrated to `to_peer`. +fn has_migrated(region_route: &RegionRoute, to_peer_id: u64) -> Result { + if region_route.is_leader_downgrading() { + return Ok(false); + } + + let leader_peer = leader_peer(region_route)?; + Ok(leader_peer.id == to_peer_id) +} + +/// Returns true if the leader peer of the region has changed. +fn has_leader_changed(region_route: &RegionRoute, from_peer_id: u64) -> Result { + let leader_peer = leader_peer(region_route)?; + + Ok(leader_peer.id != from_peer_id) +} + +/// Returns true if `to_peer` is already a follower of the region (conflict). +fn has_peer_conflict(region_route: &RegionRoute, to_peer_id: u64) -> bool { + region_route + .follower_peers + .iter() + .map(|p| p.id) + .contains(&to_peer_id) +} + +/// Updates the verification result based on a single region route. +fn update_result_with_region_route( + result: &mut RegionMigrationAnalysis, + region_route: &RegionRoute, + from_peer_id: u64, + to_peer_id: u64, +) -> Result<()> { + if has_migrated(region_route, to_peer_id)? { + result.migrated.push(region_route.region.id); + return Ok(()); + } + if has_leader_changed(region_route, from_peer_id)? { + result.leader_changed.push(region_route.region.id); + return Ok(()); + } + if has_peer_conflict(region_route, to_peer_id) { + result.peer_conflict.push(region_route.region.id); + return Ok(()); + } + result.pending.push(region_route.region.id); + Ok(()) +} + +/// Analyzes the migration task and categorizes regions by their current state. +/// +/// Returns a [`RegionMigrationAnalysis`] describing the migration status. +pub async fn analyze_region_migration_task( + task: &RegionMigrationTaskBatch, + table_metadata_manager: &TableMetadataManagerRef, +) -> Result { + if task.to_peer.id == task.from_peer.id { + return error::InvalidArgumentsSnafu { + err_msg: format!( + "The `from_peer_id`({}) can't equal `to_peer_id`({})", + task.from_peer.id, task.to_peer.id + ), + } + .fail(); + } + let table_regions = task.table_regions(); + let table_ids = table_regions.keys().cloned().collect::>(); + let mut result = RegionMigrationAnalysis::default(); + + let table_routes = table_metadata_manager + .table_route_manager() + .table_route_storage() + .batch_get_with_raw_bytes(&table_ids) + .await + .context(error::TableMetadataManagerSnafu)?; + + for (table_id, table_route) in table_ids.into_iter().zip(table_routes) { + let region_ids = table_regions.get(&table_id).unwrap(); + let Some(table_route) = table_route else { + result.table_not_found.extend(region_ids); + continue; + }; + // Throws error if the table route is not a physical table route. + let region_routes = table_route.region_routes().with_context(|_| { + error::UnexpectedLogicalRouteTableSnafu { + err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."), + } + })?; + for region_route in region_routes + .iter() + .filter(|r| region_ids.contains(&r.region.id)) + { + update_result_with_region_route( + &mut result, + region_route, + task.from_peer.id, + task.to_peer.id, + )?; + } + } + + Ok(result) +} + +#[cfg(test)] +mod tests { + + use std::assert_matches::assert_matches; + use std::sync::Arc; + use std::time::Duration; + + use common_meta::key::TableMetadataManager; + use common_meta::key::table_route::{ + LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue, + }; + use common_meta::kv_backend::TxnService; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::peer::Peer; + use common_meta::rpc::router::{Region, RegionRoute}; + use store_api::storage::RegionId; + + use crate::error::Error; + use crate::procedure::region_migration::RegionMigrationTriggerReason; + use crate::procedure::region_migration::utils::{ + RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task, + update_result_with_region_route, + }; + + #[test] + fn test_update_result_with_region_route() { + // The region is already migrated to the to_peer. + let mut result = RegionMigrationAnalysis::default(); + let region_id = RegionId::new(1, 1); + let region_route = RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }; + update_result_with_region_route(&mut result, ®ion_route, 2, 1).unwrap(); + assert_eq!( + result, + RegionMigrationAnalysis { + migrated: vec![region_id], + ..Default::default() + } + ); + + // Test region leader changed. + let mut result = RegionMigrationAnalysis::default(); + let region_id = RegionId::new(1, 1); + let region_route = RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }; + update_result_with_region_route(&mut result, ®ion_route, 2, 3).unwrap(); + assert_eq!( + result, + RegionMigrationAnalysis { + leader_changed: vec![region_id], + ..Default::default() + } + ); + + // Test region peer conflict. + let mut result = RegionMigrationAnalysis::default(); + let region_id = RegionId::new(1, 1); + let region_route = RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(2)], + leader_state: None, + leader_down_since: None, + }; + update_result_with_region_route(&mut result, ®ion_route, 1, 2).unwrap(); + assert_eq!( + result, + RegionMigrationAnalysis { + peer_conflict: vec![region_id], + ..Default::default() + } + ); + + // Test normal case. + let mut result = RegionMigrationAnalysis::default(); + let region_id = RegionId::new(1, 1); + let region_route = RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }; + update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap(); + assert_eq!( + result, + RegionMigrationAnalysis { + pending: vec![region_id], + ..Default::default() + } + ); + + // Test leader peer not set + let mut result = RegionMigrationAnalysis::default(); + let region_id = RegionId::new(1, 1); + let region_route = RegionRoute { + region: Region::new_test(region_id), + leader_peer: None, + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }; + let err = update_result_with_region_route(&mut result, ®ion_route, 1, 3).unwrap_err(); + assert_matches!(err, Error::Unexpected { .. }); + } + + #[tokio::test] + async fn test_analyze_region_migration_task_invalid_task() { + let task = &RegionMigrationTaskBatch { + region_ids: vec![RegionId::new(1, 1)], + from_peer: Peer::empty(1), + to_peer: Peer::empty(1), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + let kv_backend = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let err = analyze_region_migration_task(task, &table_metadata_manager) + .await + .unwrap_err(); + assert_matches!(err, Error::InvalidArguments { .. }); + } + + #[tokio::test] + async fn test_analyze_region_migration_table_not_found() { + let task = &RegionMigrationTaskBatch { + region_ids: vec![RegionId::new(1, 1)], + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + let kv_backend = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let result = analyze_region_migration_task(task, &table_metadata_manager) + .await + .unwrap(); + assert_eq!( + result, + RegionMigrationAnalysis { + table_not_found: vec![RegionId::new(1, 1)], + ..Default::default() + } + ); + } + + #[tokio::test] + async fn test_analyze_region_migration_unexpected_logical_table() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let (txn, _) = table_metadata_manager + .table_route_manager() + .table_route_storage() + .build_create_txn( + 1024, + &TableRouteValue::Logical(LogicalTableRouteValue::new( + 1024, + vec![RegionId::new(1023, 1)], + )), + ) + .unwrap(); + kv_backend.txn(txn).await.unwrap(); + let task = &RegionMigrationTaskBatch { + region_ids: vec![RegionId::new(1024, 1)], + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + let err = analyze_region_migration_task(task, &table_metadata_manager) + .await + .unwrap_err(); + assert_matches!(err, Error::UnexpectedLogicalRouteTable { .. }); + } + + #[tokio::test] + async fn test_analyze_region_migration_normal_case() { + let kv_backend = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); + let (txn, _) = table_metadata_manager + .table_route_manager() + .table_route_storage() + .build_create_txn( + 1024, + &TableRouteValue::Physical(PhysicalTableRouteValue::new(vec![ + // Already migrated to the to_peer. + RegionRoute { + region: Region::new_test(RegionId::new(1024, 1)), + leader_peer: Some(Peer::empty(2)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }, + // Leader peer changed. + RegionRoute { + region: Region::new_test(RegionId::new(1024, 2)), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }, + // Peer conflict. + RegionRoute { + region: Region::new_test(RegionId::new(1024, 3)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![Peer::empty(2)], + leader_state: None, + leader_down_since: None, + }, + // Normal case. + RegionRoute { + region: Region::new_test(RegionId::new(1024, 4)), + leader_peer: Some(Peer::empty(1)), + follower_peers: vec![], + leader_state: None, + leader_down_since: None, + }, + ])), + ) + .unwrap(); + + kv_backend.txn(txn).await.unwrap(); + let task = &RegionMigrationTaskBatch { + region_ids: vec![ + RegionId::new(1024, 1), + RegionId::new(1024, 2), + RegionId::new(1024, 3), + RegionId::new(1024, 4), + RegionId::new(1025, 1), + ], + from_peer: Peer::empty(1), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + trigger_reason: RegionMigrationTriggerReason::Manual, + }; + let result = analyze_region_migration_task(task, &table_metadata_manager) + .await + .unwrap(); + assert_eq!( + result, + RegionMigrationAnalysis { + pending: vec![RegionId::new(1024, 4)], + migrated: vec![RegionId::new(1024, 1)], + leader_changed: vec![RegionId::new(1024, 2)], + peer_conflict: vec![RegionId::new(1024, 3)], + table_not_found: vec![RegionId::new(1025, 1)], + } + ); + } +} diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 132aa8a7c5..866431dec1 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -32,7 +32,6 @@ use common_meta::rpc::store::RangeRequest; use common_runtime::JoinHandle; use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; -use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound}; use futures::{StreamExt, TryStreamExt}; use snafu::{ResultExt, ensure}; use store_api::storage::RegionId; @@ -45,8 +44,9 @@ use crate::error::{self, Result}; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef}; use crate::procedure::region_migration::manager::{ - RegionMigrationManagerRef, RegionMigrationTriggerReason, + RegionMigrationManagerRef, RegionMigrationTriggerReason, SubmitRegionMigrationTaskResult, }; +use crate::procedure::region_migration::utils::RegionMigrationTaskBatch; use crate::procedure::region_migration::{ DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, }; @@ -575,11 +575,22 @@ impl RegionSupervisor { .await { Ok(tasks) => { + let mut grouped_tasks: HashMap<(u64, u64), Vec<_>> = HashMap::new(); for (task, count) in tasks { - let region_id = task.region_id; - let datanode_id = task.from_peer.id; - if let Err(err) = self.do_failover(task, count).await { - error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id); + grouped_tasks + .entry((task.from_peer.id, task.to_peer.id)) + .or_default() + .push((task, count)); + } + + for ((from_peer_id, to_peer_id), tasks) in grouped_tasks { + if tasks.is_empty() { + continue; + } + let task = RegionMigrationTaskBatch::from_tasks(tasks); + let region_ids = task.region_ids.clone(); + if let Err(err) = self.do_failover_tasks(task).await { + error!(err; "Failed to execute region failover for regions: {:?}, from_peer: {}, to_peer: {}", region_ids, from_peer_id, to_peer_id); } } } @@ -688,56 +699,92 @@ impl RegionSupervisor { Ok(tasks) } - async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> { + async fn do_failover_tasks(&mut self, task: RegionMigrationTaskBatch) -> Result<()> { let from_peer_id = task.from_peer.id; let to_peer_id = task.to_peer.id; - let region_id = task.region_id; + let timeout = task.timeout; + let trigger_reason = task.trigger_reason; + let result = self + .region_migration_manager + .submit_region_migration_task(task) + .await?; + self.handle_submit_region_migration_task_result( + from_peer_id, + to_peer_id, + timeout, + trigger_reason, + result, + ) + .await + } - info!( - "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}", - task.region_id, task.from_peer, task.to_peer, task.timeout, count - ); - - if let Err(err) = self.region_migration_manager.submit_procedure(task).await { - return match err { - RegionMigrated { .. } => { - info!( - "Region has been migrated to target peer: {}, removed failover detector for region: {}, datanode: {}", - to_peer_id, region_id, from_peer_id - ); - self.deregister_failure_detectors(vec![(from_peer_id, region_id)]) - .await; - Ok(()) - } - // Returns Ok if it's running or table is dropped. - MigrationRunning { .. } => { - info!( - "Another region migration is running, skip failover for region: {}, datanode: {}", - region_id, from_peer_id - ); - Ok(()) - } - TableRouteNotFound { .. } => { - self.deregister_failure_detectors(vec![(from_peer_id, region_id)]) - .await; - info!( - "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}", - region_id, from_peer_id - ); - Ok(()) - } - LeaderPeerChanged { .. } => { - self.deregister_failure_detectors(vec![(from_peer_id, region_id)]) - .await; - info!( - "Region's leader peer changed, removed failover detector for region: {}, datanode: {}", - region_id, from_peer_id - ); - Ok(()) - } - err => Err(err), - }; - }; + async fn handle_submit_region_migration_task_result( + &mut self, + from_peer_id: DatanodeId, + to_peer_id: DatanodeId, + timeout: Duration, + trigger_reason: RegionMigrationTriggerReason, + result: SubmitRegionMigrationTaskResult, + ) -> Result<()> { + if !result.migrated.is_empty() { + let detecting_regions = result + .migrated + .iter() + .map(|region_id| (from_peer_id, *region_id)) + .collect::>(); + self.deregister_failure_detectors(detecting_regions).await; + info!( + "Region has been migrated to target peer: {}, removed failover detectors for regions: {:?}", + to_peer_id, result.migrated, + ) + } + if !result.migrating.is_empty() { + info!( + "Region is still migrating, skipping failover for regions: {:?}", + result.migrating + ); + } + if !result.table_not_found.is_empty() { + let detecting_regions = result + .table_not_found + .iter() + .map(|region_id| (from_peer_id, *region_id)) + .collect::>(); + self.deregister_failure_detectors(detecting_regions).await; + info!( + "Table is not found, removed failover detectors for regions: {:?}", + result.table_not_found + ); + } + if !result.leader_changed.is_empty() { + let detecting_regions = result + .leader_changed + .iter() + .map(|region_id| (from_peer_id, *region_id)) + .collect::>(); + self.deregister_failure_detectors(detecting_regions).await; + info!( + "Region's leader peer changed, removed failover detectors for regions: {:?}", + result.leader_changed + ); + } + if !result.peer_conflict.is_empty() { + info!( + "Region has peer conflict, ignore failover for regions: {:?}", + result.peer_conflict + ); + } + if !result.submitted.is_empty() { + info!( + "Failover for regions: {:?}, from_peer: {}, to_peer: {}, procedure_id: {:?}, timeout: {:?}, trigger_reason: {:?}", + result.submitted, + from_peer_id, + to_peer_id, + result.procedure_id, + timeout, + trigger_reason, + ); + } Ok(()) } @@ -813,7 +860,10 @@ pub(crate) mod tests { use tokio::time::sleep; use super::RegionSupervisorSelector; - use crate::procedure::region_migration::manager::RegionMigrationManager; + use crate::procedure::region_migration::RegionMigrationTriggerReason; + use crate::procedure::region_migration::manager::{ + RegionMigrationManager, SubmitRegionMigrationTaskResult, + }; use crate::procedure::region_migration::test_util::TestingEnv; use crate::region::supervisor::{ DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor, @@ -1087,4 +1137,172 @@ pub(crate) mod tests { sender.send(Event::Dump(tx)).await.unwrap(); assert!(rx.await.unwrap().is_empty()); } + + #[tokio::test] + async fn test_handle_submit_region_migration_task_result_migrated() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, _) = new_test_supervisor(); + let region_id = RegionId::new(1, 1); + let detecting_region = (1, region_id); + supervisor + .register_failure_detectors(vec![detecting_region]) + .await; + supervisor.failover_counts.insert(detecting_region, 1); + let result = SubmitRegionMigrationTaskResult { + migrated: vec![region_id], + ..Default::default() + }; + supervisor + .handle_submit_region_migration_task_result( + 1, + 2, + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + result, + ) + .await + .unwrap(); + assert!(!supervisor.failure_detector.contains(&detecting_region)); + assert!(supervisor.failover_counts.is_empty()); + } + + #[tokio::test] + async fn test_handle_submit_region_migration_task_result_migrating() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, _) = new_test_supervisor(); + let region_id = RegionId::new(1, 1); + let detecting_region = (1, region_id); + supervisor + .register_failure_detectors(vec![detecting_region]) + .await; + supervisor.failover_counts.insert(detecting_region, 1); + let result = SubmitRegionMigrationTaskResult { + migrating: vec![region_id], + ..Default::default() + }; + supervisor + .handle_submit_region_migration_task_result( + 1, + 2, + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + result, + ) + .await + .unwrap(); + assert!(supervisor.failure_detector.contains(&detecting_region)); + assert!(supervisor.failover_counts.contains_key(&detecting_region)); + } + + #[tokio::test] + async fn test_handle_submit_region_migration_task_result_table_not_found() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, _) = new_test_supervisor(); + let region_id = RegionId::new(1, 1); + let detecting_region = (1, region_id); + supervisor + .register_failure_detectors(vec![detecting_region]) + .await; + supervisor.failover_counts.insert(detecting_region, 1); + let result = SubmitRegionMigrationTaskResult { + table_not_found: vec![region_id], + ..Default::default() + }; + supervisor + .handle_submit_region_migration_task_result( + 1, + 2, + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + result, + ) + .await + .unwrap(); + assert!(!supervisor.failure_detector.contains(&detecting_region)); + assert!(supervisor.failover_counts.is_empty()); + } + + #[tokio::test] + async fn test_handle_submit_region_migration_task_result_leader_changed() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, _) = new_test_supervisor(); + let region_id = RegionId::new(1, 1); + let detecting_region = (1, region_id); + supervisor + .register_failure_detectors(vec![detecting_region]) + .await; + supervisor.failover_counts.insert(detecting_region, 1); + let result = SubmitRegionMigrationTaskResult { + leader_changed: vec![region_id], + ..Default::default() + }; + supervisor + .handle_submit_region_migration_task_result( + 1, + 2, + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + result, + ) + .await + .unwrap(); + assert!(!supervisor.failure_detector.contains(&detecting_region)); + assert!(supervisor.failover_counts.is_empty()); + } + + #[tokio::test] + async fn test_handle_submit_region_migration_task_result_peer_conflict() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, _) = new_test_supervisor(); + let region_id = RegionId::new(1, 1); + let detecting_region = (1, region_id); + supervisor + .register_failure_detectors(vec![detecting_region]) + .await; + supervisor.failover_counts.insert(detecting_region, 1); + let result = SubmitRegionMigrationTaskResult { + peer_conflict: vec![region_id], + ..Default::default() + }; + supervisor + .handle_submit_region_migration_task_result( + 1, + 2, + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + result, + ) + .await + .unwrap(); + assert!(supervisor.failure_detector.contains(&detecting_region)); + assert!(supervisor.failover_counts.contains_key(&detecting_region)); + } + + #[tokio::test] + async fn test_handle_submit_region_migration_task_result_submitted() { + common_telemetry::init_default_ut_logging(); + let (mut supervisor, _) = new_test_supervisor(); + let region_id = RegionId::new(1, 1); + let detecting_region = (1, region_id); + supervisor + .register_failure_detectors(vec![detecting_region]) + .await; + supervisor.failover_counts.insert(detecting_region, 1); + let result = SubmitRegionMigrationTaskResult { + submitted: vec![region_id], + ..Default::default() + }; + supervisor + .handle_submit_region_migration_task_result( + 1, + 2, + Duration::from_millis(1000), + RegionMigrationTriggerReason::Manual, + result, + ) + .await + .unwrap(); + assert!(supervisor.failure_detector.contains(&detecting_region)); + assert!(supervisor.failover_counts.contains_key(&detecting_region)); + } }