feat: batch region migration for failover (#7245)

* refactor: support multiple rows per event in event recorder

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: batch region migration for failover

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-11-26 16:31:56 +08:00
committed by GitHub
parent 713525797a
commit 69865c831d
11 changed files with 1331 additions and 158 deletions

View File

@@ -97,9 +97,9 @@ pub trait Event: Send + Sync + Debug {
vec![]
}
/// Add the extra row to the event with the default row.
fn extra_row(&self) -> Result<Row> {
Ok(Row { values: vec![] })
/// Add the extra rows to the event with the default row.
fn extra_rows(&self) -> Result<Vec<Row>> {
Ok(vec![Row { values: vec![] }])
}
/// Returns the event as any type.
@@ -159,15 +159,17 @@ pub fn build_row_inserts_request(events: &[&Box<dyn Event>]) -> Result<RowInsert
let mut rows: Vec<Row> = Vec::with_capacity(events.len());
for event in events {
let extra_row = event.extra_row()?;
let mut values = Vec::with_capacity(3 + extra_row.values.len());
values.extend([
ValueData::StringValue(event.event_type().to_string()).into(),
ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
]);
values.extend(extra_row.values);
rows.push(Row { values });
let extra_rows = event.extra_rows()?;
for extra_row in extra_rows {
let mut values = Vec::with_capacity(3 + extra_row.values.len());
values.extend([
ValueData::StringValue(event.event_type().to_string()).into(),
ValueData::BinaryValue(event.json_payload()?.into_bytes()).into(),
ValueData::TimestampNanosecondValue(event.timestamp().value()).into(),
]);
values.extend(extra_row.values);
rows.push(Row { values });
}
}
Ok(RowInsertRequests {

View File

@@ -107,8 +107,8 @@ impl Event for SlowQueryEvent {
]
}
fn extra_row(&self) -> Result<Row> {
Ok(Row {
fn extra_rows(&self) -> Result<Vec<Row>> {
Ok(vec![Row {
values: vec![
ValueData::U64Value(self.cost).into(),
ValueData::U64Value(self.threshold).into(),
@@ -119,7 +119,7 @@ impl Event for SlowQueryEvent {
ValueData::TimestampMillisecondValue(self.promql_start.unwrap_or(0)).into(),
ValueData::TimestampMillisecondValue(self.promql_end.unwrap_or(0)).into(),
],
})
}])
}
fn json_payload(&self) -> Result<String> {

View File

@@ -92,25 +92,96 @@ impl Event for ProcedureEvent {
schema
}
fn extra_row(&self) -> Result<Row> {
let error_str = match &self.state {
ProcedureState::Failed { error } => format!("{:?}", error),
ProcedureState::PrepareRollback { error } => format!("{:?}", error),
ProcedureState::RollingBack { error } => format!("{:?}", error),
ProcedureState::Retrying { error } => format!("{:?}", error),
ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
_ => "".to_string(),
};
let mut row = vec![
ValueData::StringValue(self.procedure_id.to_string()).into(),
ValueData::StringValue(self.state.as_str_name().to_string()).into(),
ValueData::StringValue(error_str).into(),
];
row.append(&mut self.internal_event.extra_row()?.values);
Ok(Row { values: row })
fn extra_rows(&self) -> Result<Vec<Row>> {
let mut internal_event_extra_rows = self.internal_event.extra_rows()?;
let mut rows = Vec::with_capacity(internal_event_extra_rows.len());
for internal_event_extra_row in internal_event_extra_rows.iter_mut() {
let error_str = match &self.state {
ProcedureState::Failed { error } => format!("{:?}", error),
ProcedureState::PrepareRollback { error } => format!("{:?}", error),
ProcedureState::RollingBack { error } => format!("{:?}", error),
ProcedureState::Retrying { error } => format!("{:?}", error),
ProcedureState::Poisoned { error, .. } => format!("{:?}", error),
_ => "".to_string(),
};
let mut values = Vec::with_capacity(3 + internal_event_extra_row.values.len());
values.extend([
ValueData::StringValue(self.procedure_id.to_string()).into(),
ValueData::StringValue(self.state.as_str_name().to_string()).into(),
ValueData::StringValue(error_str).into(),
]);
values.append(&mut internal_event_extra_row.values);
rows.push(Row { values });
}
Ok(rows)
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[cfg(test)]
mod tests {
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, SemanticType};
use common_event_recorder::Event;
use crate::{ProcedureEvent, ProcedureId, ProcedureState};
#[derive(Debug)]
struct TestEvent;
impl Event for TestEvent {
fn event_type(&self) -> &str {
"test_event"
}
fn extra_schema(&self) -> Vec<ColumnSchema> {
vec![ColumnSchema {
column_name: "test_event_column".to_string(),
datatype: ColumnDataType::String.into(),
semantic_type: SemanticType::Field.into(),
..Default::default()
}]
}
fn extra_rows(&self) -> common_event_recorder::error::Result<Vec<Row>> {
Ok(vec![
Row {
values: vec![ValueData::StringValue("test_event1".to_string()).into()],
},
Row {
values: vec![ValueData::StringValue("test_event2".to_string()).into()],
},
])
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
}
#[test]
fn test_procedure_event_extra_rows() {
let procedure_event = ProcedureEvent::new(
ProcedureId::random(),
Box::new(TestEvent {}),
ProcedureState::Running,
);
let procedure_event_extra_rows = procedure_event.extra_rows().unwrap();
assert_eq!(procedure_event_extra_rows.len(), 2);
assert_eq!(procedure_event_extra_rows[0].values.len(), 4);
assert_eq!(
procedure_event_extra_rows[0].values[3],
ValueData::StringValue("test_event1".to_string()).into()
);
assert_eq!(procedure_event_extra_rows[1].values.len(), 4);
assert_eq!(
procedure_event_extra_rows[1].values[3],
ValueData::StringValue("test_event2".to_string()).into()
);
}
}

View File

@@ -21,7 +21,7 @@ use common_event_recorder::Event;
use common_event_recorder::error::{Result, SerializeEventSnafu};
use serde::Serialize;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};
use store_api::storage::RegionId;
use crate::procedure::region_migration::{PersistentContext, RegionMigrationTriggerReason};
@@ -37,37 +37,34 @@ pub const EVENTS_TABLE_DST_NODE_ID_COLUMN_NAME: &str = "region_migration_dst_nod
pub const EVENTS_TABLE_DST_PEER_ADDR_COLUMN_NAME: &str = "region_migration_dst_peer_addr";
/// RegionMigrationEvent is the event of region migration.
#[derive(Debug, Serialize)]
#[derive(Debug)]
pub(crate) struct RegionMigrationEvent {
#[serde(skip)]
region_id: RegionId,
#[serde(skip)]
table_id: TableId,
#[serde(skip)]
region_number: u32,
#[serde(skip)]
// The region ids of the region migration.
region_ids: Vec<RegionId>,
// The trigger reason of the region migration.
trigger_reason: RegionMigrationTriggerReason,
#[serde(skip)]
// The source node id of the region migration.
src_node_id: u64,
#[serde(skip)]
// The source peer address of the region migration.
src_peer_addr: String,
#[serde(skip)]
// The destination node id of the region migration.
dst_node_id: u64,
#[serde(skip)]
// The destination peer address of the region migration.
dst_peer_addr: String,
// The timeout of the region migration.
timeout: Duration,
}
// The following fields will be serialized as the json payload.
#[derive(Debug, Serialize)]
struct Payload {
#[serde(with = "humantime_serde")]
timeout: Duration,
}
impl RegionMigrationEvent {
pub fn from_persistent_ctx(ctx: &PersistentContext) -> Self {
// FIXME(weny): handle multiple region ids.
let region_id = ctx.region_ids[0];
Self {
region_id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
region_ids: ctx.region_ids.clone(),
trigger_reason: ctx.trigger_reason,
src_node_id: ctx.from_peer.id,
src_peer_addr: ctx.from_peer.addr.clone(),
@@ -136,23 +133,31 @@ impl Event for RegionMigrationEvent {
]
}
fn extra_row(&self) -> Result<Row> {
Ok(Row {
values: vec![
ValueData::U64Value(self.region_id.as_u64()).into(),
ValueData::U32Value(self.table_id).into(),
ValueData::U32Value(self.region_number).into(),
ValueData::StringValue(self.trigger_reason.to_string()).into(),
ValueData::U64Value(self.src_node_id).into(),
ValueData::StringValue(self.src_peer_addr.clone()).into(),
ValueData::U64Value(self.dst_node_id).into(),
ValueData::StringValue(self.dst_peer_addr.clone()).into(),
],
})
fn extra_rows(&self) -> Result<Vec<Row>> {
let mut extra_rows = Vec::with_capacity(self.region_ids.len());
for region_id in &self.region_ids {
extra_rows.push(Row {
values: vec![
ValueData::U64Value(region_id.as_u64()).into(),
ValueData::U32Value(region_id.table_id()).into(),
ValueData::U32Value(region_id.region_number()).into(),
ValueData::StringValue(self.trigger_reason.to_string()).into(),
ValueData::U64Value(self.src_node_id).into(),
ValueData::StringValue(self.src_peer_addr.clone()).into(),
ValueData::U64Value(self.dst_node_id).into(),
ValueData::StringValue(self.dst_peer_addr.clone()).into(),
],
});
}
Ok(extra_rows)
}
fn json_payload(&self) -> Result<String> {
serde_json::to_string(self).context(SerializeEventSnafu)
serde_json::to_string(&Payload {
timeout: self.timeout,
})
.context(SerializeEventSnafu)
}
fn as_any(&self) -> &dyn Any {

View File

@@ -24,6 +24,7 @@ pub(crate) mod open_candidate_region;
pub mod test_util;
pub(crate) mod update_metadata;
pub(crate) mod upgrade_candidate_region;
pub(crate) mod utils;
use std::any::Any;
use std::collections::{HashMap, HashSet};
@@ -100,9 +101,16 @@ where
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct PersistentContext {
/// The table catalog.
pub(crate) catalog: String,
#[deprecated(note = "use `catalog_and_schema` instead")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) catalog: Option<String>,
/// The table schema.
pub(crate) schema: String,
#[deprecated(note = "use `catalog_and_schema` instead")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub(crate) schema: Option<String>,
/// The catalog and schema of the regions.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub(crate) catalog_and_schema: Vec<(String, String)>,
/// The [Peer] of migration source.
pub(crate) from_peer: Peer,
/// The [Peer] of migration destination.
@@ -118,15 +126,47 @@ pub struct PersistentContext {
pub(crate) trigger_reason: RegionMigrationTriggerReason,
}
impl PersistentContext {
pub fn new(
catalog_and_schema: Vec<(String, String)>,
from_peer: Peer,
to_peer: Peer,
region_ids: Vec<RegionId>,
timeout: Duration,
trigger_reason: RegionMigrationTriggerReason,
) -> Self {
#[allow(deprecated)]
Self {
catalog: None,
schema: None,
catalog_and_schema,
from_peer,
to_peer,
region_ids,
timeout,
trigger_reason,
}
}
}
fn default_timeout() -> Duration {
Duration::from_secs(10)
}
impl PersistentContext {
pub fn lock_key(&self) -> Vec<StringKey> {
let mut lock_keys = Vec::with_capacity(self.region_ids.len() + 2);
lock_keys.push(CatalogLock::Read(&self.catalog).into());
lock_keys.push(SchemaLock::read(&self.catalog, &self.schema).into());
let mut lock_keys =
Vec::with_capacity(self.region_ids.len() + 2 + self.catalog_and_schema.len() * 2);
#[allow(deprecated)]
if let (Some(catalog), Some(schema)) = (&self.catalog, &self.schema) {
lock_keys.push(CatalogLock::Read(catalog).into());
lock_keys.push(SchemaLock::read(catalog, schema).into());
}
for (catalog, schema) in self.catalog_and_schema.iter() {
lock_keys.push(CatalogLock::Read(catalog).into());
lock_keys.push(SchemaLock::read(catalog, schema).into());
}
// Sort the region ids to ensure the same order of region ids.
let mut region_ids = self.region_ids.clone();
region_ids.sort_unstable();
@@ -928,13 +968,24 @@ mod tests {
let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
let serialized = procedure.dump().unwrap();
let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
let expected = r#"{"persistent_ctx":{"catalog_and_schema":[["greptime","public"]],"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
assert_eq!(expected, serialized);
}
#[test]
fn test_backward_compatibility() {
let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
let persistent_ctx = PersistentContext {
#[allow(deprecated)]
catalog: Some("greptime".into()),
#[allow(deprecated)]
schema: Some("public".into()),
catalog_and_schema: vec![],
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_ids: vec![RegionId::new(1024, 1)],
timeout: Duration::from_secs(10),
trigger_reason: RegionMigrationTriggerReason::default(),
};
// NOTES: Changes it will break backward compatibility.
let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();

View File

@@ -389,15 +389,14 @@ mod tests {
};
fn new_persistent_context() -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_ids: vec![RegionId::new(1024, 1)],
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
}
PersistentContext::new(
vec![("greptime".into(), "public".into())],
Peer::empty(1),
Peer::empty(2),
vec![RegionId::new(1024, 1)],
Duration::from_millis(1000),
RegionMigrationTriggerReason::Manual,
)
}
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {

View File

@@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::{Arc, RwLock};
use std::time::Duration;
@@ -31,6 +31,9 @@ use table::table_name::TableName;
use crate::error::{self, Result};
use crate::metrics::{METRIC_META_REGION_MIGRATION_DATANODES, METRIC_META_REGION_MIGRATION_FAIL};
use crate::procedure::region_migration::utils::{
RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
};
use crate::procedure::region_migration::{
DefaultContextFactory, PersistentContext, RegionMigrationProcedure,
};
@@ -99,6 +102,7 @@ impl Drop for RegionMigrationProcedureGuard {
}
}
/// A task of region migration procedure.
#[derive(Debug, Clone)]
pub struct RegionMigrationProcedureTask {
pub(crate) region_id: RegionId,
@@ -151,6 +155,25 @@ impl Display for RegionMigrationProcedureTask {
}
}
/// The result of submitting a region migration task.
#[derive(Debug, Default, PartialEq, Eq)]
pub struct SubmitRegionMigrationTaskResult {
/// Regions already migrated to the `to_peer`.
pub migrated: Vec<RegionId>,
/// Regions where the leader peer has changed.
pub leader_changed: Vec<RegionId>,
/// Regions where `to_peer` is already a follower (conflict).
pub peer_conflict: Vec<RegionId>,
/// Regions whose table is not found.
pub table_not_found: Vec<RegionId>,
/// Regions still pending migration.
pub migrating: Vec<RegionId>,
/// Regions that have been submitted for migration.
pub submitted: Vec<RegionId>,
/// The procedure id of the region migration procedure.
pub procedure_id: Option<ProcedureId>,
}
impl RegionMigrationManager {
/// Returns new [`RegionMigrationManager`]
pub(crate) fn new(
@@ -332,6 +355,168 @@ impl RegionMigrationManager {
Ok(())
}
/// Extracts regions from the migration task that are already running migration procedures.
///
/// Returns a tuple containing those region ids that are already running and the newly created procedure guards.
/// The regions that are already running will be removed from the [`RegionMigrationTask`].
fn extract_running_regions(
&self,
task: &mut RegionMigrationTaskBatch,
) -> (Vec<RegionId>, Vec<RegionMigrationProcedureGuard>) {
let mut migrating_region_ids = Vec::new();
let mut procedure_guards = Vec::with_capacity(task.region_ids.len());
for region_id in &task.region_ids {
let Some(guard) = self.insert_running_procedure(&RegionMigrationProcedureTask::new(
*region_id,
task.from_peer.clone(),
task.to_peer.clone(),
task.timeout,
task.trigger_reason,
)) else {
migrating_region_ids.push(*region_id);
continue;
};
procedure_guards.push(guard);
}
let migrating_set = migrating_region_ids.iter().cloned().collect::<HashSet<_>>();
task.region_ids.retain(|id| !migrating_set.contains(id));
(migrating_region_ids, procedure_guards)
}
pub async fn submit_region_migration_task(
&self,
mut task: RegionMigrationTaskBatch,
) -> Result<SubmitRegionMigrationTaskResult> {
let (migrating_region_ids, procedure_guards) = self.extract_running_regions(&mut task);
let RegionMigrationAnalysis {
migrated,
leader_changed,
peer_conflict,
mut table_not_found,
pending,
} = analyze_region_migration_task(&task, &self.context_factory.table_metadata_manager)
.await?;
if pending.is_empty() {
return Ok(SubmitRegionMigrationTaskResult {
migrated,
leader_changed,
peer_conflict,
table_not_found,
migrating: migrating_region_ids,
submitted: vec![],
procedure_id: None,
});
}
// Updates the region ids to the pending region ids.
task.region_ids = pending;
let table_regions = task.table_regions();
let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
let table_info_values = self
.context_factory
.table_metadata_manager
.table_info_manager()
.batch_get(&table_ids)
.await
.context(error::TableMetadataManagerSnafu)?;
let mut catalog_and_schema = Vec::with_capacity(table_info_values.len());
for (table_id, regions) in table_regions {
match table_info_values.get(&table_id) {
Some(table_info) => {
let TableName {
catalog_name,
schema_name,
..
} = table_info.table_name();
catalog_and_schema.push((catalog_name, schema_name));
}
None => {
task.region_ids.retain(|id| id.table_id() != table_id);
table_not_found.extend(regions);
}
}
}
if task.region_ids.is_empty() {
return Ok(SubmitRegionMigrationTaskResult {
migrated,
leader_changed,
peer_conflict,
table_not_found,
migrating: migrating_region_ids,
submitted: vec![],
procedure_id: None,
});
}
let submitting_region_ids = task.region_ids.clone();
let procedure_id = self
.submit_procedure_inner(task, procedure_guards, catalog_and_schema)
.await?;
Ok(SubmitRegionMigrationTaskResult {
migrated,
leader_changed,
peer_conflict,
table_not_found,
migrating: migrating_region_ids,
submitted: submitting_region_ids,
procedure_id: Some(procedure_id),
})
}
async fn submit_procedure_inner(
&self,
task: RegionMigrationTaskBatch,
procedure_guards: Vec<RegionMigrationProcedureGuard>,
catalog_and_schema: Vec<(String, String)>,
) -> Result<ProcedureId> {
let procedure = RegionMigrationProcedure::new(
PersistentContext::new(
catalog_and_schema,
task.from_peer.clone(),
task.to_peer.clone(),
task.region_ids.clone(),
task.timeout,
task.trigger_reason,
),
self.context_factory.clone(),
procedure_guards,
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region migration procedure {procedure_id} for {task}");
let procedure_manager = self.procedure_manager.clone();
let num_region = task.region_ids.len();
common_runtime::spawn_global(async move {
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
Ok(watcher) => watcher,
Err(e) => {
error!(e; "Failed to submit region migration procedure {procedure_id} for {task}");
return;
}
};
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["src", &task.from_peer.id.to_string()])
.inc_by(num_region as u64);
METRIC_META_REGION_MIGRATION_DATANODES
.with_label_values(&["desc", &task.to_peer.id.to_string()])
.inc_by(num_region as u64);
if let Err(e) = watcher::wait(watcher).await {
error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
METRIC_META_REGION_MIGRATION_FAIL.inc();
return;
}
info!("Region migration procedure {procedure_id} for {task} is finished successfully!");
});
Ok(procedure_id)
}
/// Submits a new region migration procedure.
pub async fn submit_procedure(
&self,
@@ -384,15 +569,14 @@ impl RegionMigrationManager {
trigger_reason,
} = task.clone();
let procedure = RegionMigrationProcedure::new(
PersistentContext {
catalog: catalog_name,
schema: schema_name,
region_ids: vec![region_id],
PersistentContext::new(
vec![(catalog_name, schema_name)],
from_peer,
to_peer,
vec![region_id],
timeout,
trigger_reason,
},
),
self.context_factory.clone(),
vec![guard],
);
@@ -645,4 +829,162 @@ mod test {
assert_matches!(err, error::Error::Unexpected { .. });
}
#[tokio::test]
async fn test_submit_procedure_with_multiple_regions_invalid_task() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let task = RegionMigrationTaskBatch {
region_ids: vec![RegionId::new(1024, 1)],
from_peer: Peer::empty(1),
to_peer: Peer::empty(1),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let err = manager
.submit_region_migration_task(task)
.await
.unwrap_err();
assert_matches!(err, error::Error::InvalidArguments { .. });
}
#[tokio::test]
async fn test_submit_procedure_with_multiple_regions_no_region_to_migrate() {
common_telemetry::init_default_ut_logging();
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationTaskBatch {
region_ids: vec![region_id],
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(2)),
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let result = manager.submit_region_migration_task(task).await.unwrap();
assert_eq!(
result,
SubmitRegionMigrationTaskResult {
migrated: vec![region_id],
..Default::default()
}
);
}
#[tokio::test]
async fn test_submit_procedure_with_multiple_regions_leader_peer_changed() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationTaskBatch {
region_ids: vec![region_id],
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(3)),
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let result = manager.submit_region_migration_task(task).await.unwrap();
assert_eq!(
result,
SubmitRegionMigrationTaskResult {
leader_changed: vec![region_id],
..Default::default()
}
);
}
#[tokio::test]
async fn test_submit_procedure_with_multiple_regions_peer_conflict() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationTaskBatch {
region_ids: vec![region_id],
from_peer: Peer::empty(3),
to_peer: Peer::empty(2),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![Peer::empty(2)],
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let result = manager.submit_region_migration_task(task).await.unwrap();
assert_eq!(
result,
SubmitRegionMigrationTaskResult {
peer_conflict: vec![region_id],
..Default::default()
}
);
}
#[tokio::test]
async fn test_running_regions() {
let env = TestingEnv::new();
let context_factory = env.context_factory();
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
let region_id = RegionId::new(1024, 1);
let task = RegionMigrationTaskBatch {
region_ids: vec![region_id, RegionId::new(1024, 2)],
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
// Inserts one
manager.tracker.running_procedures.write().unwrap().insert(
region_id,
RegionMigrationProcedureTask::new(
region_id,
task.from_peer.clone(),
task.to_peer.clone(),
task.timeout,
task.trigger_reason,
),
);
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(1)),
..Default::default()
}];
env.create_physical_table_metadata(table_info, region_routes)
.await;
let result = manager.submit_region_migration_task(task).await.unwrap();
assert_eq!(result.migrating, vec![region_id]);
assert_eq!(result.submitted, vec![RegionId::new(1024, 2)]);
assert!(result.procedure_id.is_some());
}
}

View File

@@ -185,15 +185,14 @@ impl TestingEnv {
/// Generates a [PersistentContext].
pub fn new_persistent_context(from: u64, to: u64, region_id: RegionId) -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
from_peer: Peer::empty(from),
to_peer: Peer::empty(to),
region_ids: vec![region_id],
timeout: Duration::from_secs(10),
trigger_reason: RegionMigrationTriggerReason::default(),
}
PersistentContext::new(
vec![("greptime".into(), "public".into())],
Peer::empty(from),
Peer::empty(to),
vec![region_id],
Duration::from_secs(10),
RegionMigrationTriggerReason::default(),
)
}
/// The test suite for region migration procedure.

View File

@@ -369,15 +369,14 @@ mod tests {
};
fn new_persistent_context() -> PersistentContext {
PersistentContext {
catalog: "greptime".into(),
schema: "public".into(),
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
region_ids: vec![RegionId::new(1024, 1)],
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
}
PersistentContext::new(
vec![("greptime".into(), "public".into())],
Peer::empty(1),
Peer::empty(2),
vec![RegionId::new(1024, 1)],
Duration::from_millis(1000),
RegionMigrationTriggerReason::Manual,
)
}
async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {

View File

@@ -0,0 +1,487 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Display;
use std::time::Duration;
use common_meta::key::TableMetadataManagerRef;
use common_meta::peer::Peer;
use common_meta::rpc::router::RegionRoute;
use itertools::Itertools;
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId};
use crate::error::{self, Result};
use crate::procedure::region_migration::{
DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask, RegionMigrationTriggerReason,
};
/// A migration task describing how regions are intended to move between peers.
#[derive(Debug, Clone)]
pub struct RegionMigrationTaskBatch {
/// Region ids involved in this migration.
pub region_ids: Vec<RegionId>,
/// Source peer where regions currently reside.
pub from_peer: Peer,
/// Destination peer to migrate regions to.
pub to_peer: Peer,
/// Timeout for migration.
pub timeout: Duration,
/// Reason why this migration was triggered.
pub trigger_reason: RegionMigrationTriggerReason,
}
impl RegionMigrationTaskBatch {
/// Constructs a [`RegionMigrationTaskBatch`] from a vector of region migration procedure tasks.
///
/// Aggregates region IDs, determines source and destination peers, sets an appropriate timeout,
/// and assigns the trigger reason for the migration batch.
///
/// # Panic
/// if the `tasks` are empty.
pub fn from_tasks(tasks: Vec<(RegionMigrationProcedureTask, u32)>) -> Self {
let max_count = tasks.iter().map(|(_, count)| *count).max().unwrap_or(1);
let region_ids = tasks.iter().map(|(r, _)| r.region_id).collect::<Vec<_>>();
let from_peer = tasks[0].0.from_peer.clone();
let to_peer = tasks[0].0.to_peer.clone();
let timeout = DEFAULT_REGION_MIGRATION_TIMEOUT * max_count;
let trigger_reason = RegionMigrationTriggerReason::Failover;
Self {
region_ids,
from_peer,
to_peer,
timeout,
trigger_reason,
}
}
}
impl Display for RegionMigrationTaskBatch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"RegionMigrationTask {{ region_ids: {:?}, from_peer: {:?}, to_peer: {:?}, timeout: {:?}, trigger_reason: {:?} }}",
self.region_ids, self.from_peer, self.to_peer, self.timeout, self.trigger_reason
)
}
}
impl RegionMigrationTaskBatch {
/// Returns the table regions map.
///
/// The key is the table id, the value is the region ids of the table.
pub(crate) fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
let mut table_regions = HashMap::new();
for region_id in &self.region_ids {
table_regions
.entry(region_id.table_id())
.or_insert_with(Vec::new)
.push(*region_id);
}
table_regions
}
}
/// Represents the result of analyzing a migration task.
#[derive(Debug, Clone, Default, PartialEq)]
pub(crate) struct RegionMigrationAnalysis {
/// Regions already migrated to the `to_peer`.
pub(crate) migrated: Vec<RegionId>,
/// Regions where the leader peer has changed.
pub(crate) leader_changed: Vec<RegionId>,
/// Regions where `to_peer` is already a follower (conflict).
pub(crate) peer_conflict: Vec<RegionId>,
/// Regions whose table is not found.
pub(crate) table_not_found: Vec<RegionId>,
/// Regions still pending migration.
pub(crate) pending: Vec<RegionId>,
}
fn leader_peer(region_route: &RegionRoute) -> Result<&Peer> {
region_route
.leader_peer
.as_ref()
.with_context(|| error::UnexpectedSnafu {
violated: format!(
"Region route leader peer is not found in region({})",
region_route.region.id
),
})
}
/// Returns true if the region has already been migrated to `to_peer`.
fn has_migrated(region_route: &RegionRoute, to_peer_id: u64) -> Result<bool> {
if region_route.is_leader_downgrading() {
return Ok(false);
}
let leader_peer = leader_peer(region_route)?;
Ok(leader_peer.id == to_peer_id)
}
/// Returns true if the leader peer of the region has changed.
fn has_leader_changed(region_route: &RegionRoute, from_peer_id: u64) -> Result<bool> {
let leader_peer = leader_peer(region_route)?;
Ok(leader_peer.id != from_peer_id)
}
/// Returns true if `to_peer` is already a follower of the region (conflict).
fn has_peer_conflict(region_route: &RegionRoute, to_peer_id: u64) -> bool {
region_route
.follower_peers
.iter()
.map(|p| p.id)
.contains(&to_peer_id)
}
/// Updates the verification result based on a single region route.
fn update_result_with_region_route(
result: &mut RegionMigrationAnalysis,
region_route: &RegionRoute,
from_peer_id: u64,
to_peer_id: u64,
) -> Result<()> {
if has_migrated(region_route, to_peer_id)? {
result.migrated.push(region_route.region.id);
return Ok(());
}
if has_leader_changed(region_route, from_peer_id)? {
result.leader_changed.push(region_route.region.id);
return Ok(());
}
if has_peer_conflict(region_route, to_peer_id) {
result.peer_conflict.push(region_route.region.id);
return Ok(());
}
result.pending.push(region_route.region.id);
Ok(())
}
/// Analyzes the migration task and categorizes regions by their current state.
///
/// Returns a [`RegionMigrationAnalysis`] describing the migration status.
pub async fn analyze_region_migration_task(
task: &RegionMigrationTaskBatch,
table_metadata_manager: &TableMetadataManagerRef,
) -> Result<RegionMigrationAnalysis> {
if task.to_peer.id == task.from_peer.id {
return error::InvalidArgumentsSnafu {
err_msg: format!(
"The `from_peer_id`({}) can't equal `to_peer_id`({})",
task.from_peer.id, task.to_peer.id
),
}
.fail();
}
let table_regions = task.table_regions();
let table_ids = table_regions.keys().cloned().collect::<Vec<_>>();
let mut result = RegionMigrationAnalysis::default();
let table_routes = table_metadata_manager
.table_route_manager()
.table_route_storage()
.batch_get_with_raw_bytes(&table_ids)
.await
.context(error::TableMetadataManagerSnafu)?;
for (table_id, table_route) in table_ids.into_iter().zip(table_routes) {
let region_ids = table_regions.get(&table_id).unwrap();
let Some(table_route) = table_route else {
result.table_not_found.extend(region_ids);
continue;
};
// Throws error if the table route is not a physical table route.
let region_routes = table_route.region_routes().with_context(|_| {
error::UnexpectedLogicalRouteTableSnafu {
err_msg: format!("TableRoute({table_id:?}) is a non-physical TableRouteValue."),
}
})?;
for region_route in region_routes
.iter()
.filter(|r| region_ids.contains(&r.region.id))
{
update_result_with_region_route(
&mut result,
region_route,
task.from_peer.id,
task.to_peer.id,
)?;
}
}
Ok(result)
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use std::time::Duration;
use common_meta::key::TableMetadataManager;
use common_meta::key::table_route::{
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
};
use common_meta::kv_backend::TxnService;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use store_api::storage::RegionId;
use crate::error::Error;
use crate::procedure::region_migration::RegionMigrationTriggerReason;
use crate::procedure::region_migration::utils::{
RegionMigrationAnalysis, RegionMigrationTaskBatch, analyze_region_migration_task,
update_result_with_region_route,
};
#[test]
fn test_update_result_with_region_route() {
// The region is already migrated to the to_peer.
let mut result = RegionMigrationAnalysis::default();
let region_id = RegionId::new(1, 1);
let region_route = RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
};
update_result_with_region_route(&mut result, &region_route, 2, 1).unwrap();
assert_eq!(
result,
RegionMigrationAnalysis {
migrated: vec![region_id],
..Default::default()
}
);
// Test region leader changed.
let mut result = RegionMigrationAnalysis::default();
let region_id = RegionId::new(1, 1);
let region_route = RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
};
update_result_with_region_route(&mut result, &region_route, 2, 3).unwrap();
assert_eq!(
result,
RegionMigrationAnalysis {
leader_changed: vec![region_id],
..Default::default()
}
);
// Test region peer conflict.
let mut result = RegionMigrationAnalysis::default();
let region_id = RegionId::new(1, 1);
let region_route = RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(2)],
leader_state: None,
leader_down_since: None,
};
update_result_with_region_route(&mut result, &region_route, 1, 2).unwrap();
assert_eq!(
result,
RegionMigrationAnalysis {
peer_conflict: vec![region_id],
..Default::default()
}
);
// Test normal case.
let mut result = RegionMigrationAnalysis::default();
let region_id = RegionId::new(1, 1);
let region_route = RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
};
update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap();
assert_eq!(
result,
RegionMigrationAnalysis {
pending: vec![region_id],
..Default::default()
}
);
// Test leader peer not set
let mut result = RegionMigrationAnalysis::default();
let region_id = RegionId::new(1, 1);
let region_route = RegionRoute {
region: Region::new_test(region_id),
leader_peer: None,
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
};
let err = update_result_with_region_route(&mut result, &region_route, 1, 3).unwrap_err();
assert_matches!(err, Error::Unexpected { .. });
}
#[tokio::test]
async fn test_analyze_region_migration_task_invalid_task() {
let task = &RegionMigrationTaskBatch {
region_ids: vec![RegionId::new(1, 1)],
from_peer: Peer::empty(1),
to_peer: Peer::empty(1),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let kv_backend = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let err = analyze_region_migration_task(task, &table_metadata_manager)
.await
.unwrap_err();
assert_matches!(err, Error::InvalidArguments { .. });
}
#[tokio::test]
async fn test_analyze_region_migration_table_not_found() {
let task = &RegionMigrationTaskBatch {
region_ids: vec![RegionId::new(1, 1)],
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let kv_backend = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let result = analyze_region_migration_task(task, &table_metadata_manager)
.await
.unwrap();
assert_eq!(
result,
RegionMigrationAnalysis {
table_not_found: vec![RegionId::new(1, 1)],
..Default::default()
}
);
}
#[tokio::test]
async fn test_analyze_region_migration_unexpected_logical_table() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let (txn, _) = table_metadata_manager
.table_route_manager()
.table_route_storage()
.build_create_txn(
1024,
&TableRouteValue::Logical(LogicalTableRouteValue::new(
1024,
vec![RegionId::new(1023, 1)],
)),
)
.unwrap();
kv_backend.txn(txn).await.unwrap();
let task = &RegionMigrationTaskBatch {
region_ids: vec![RegionId::new(1024, 1)],
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let err = analyze_region_migration_task(task, &table_metadata_manager)
.await
.unwrap_err();
assert_matches!(err, Error::UnexpectedLogicalRouteTable { .. });
}
#[tokio::test]
async fn test_analyze_region_migration_normal_case() {
let kv_backend = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let (txn, _) = table_metadata_manager
.table_route_manager()
.table_route_storage()
.build_create_txn(
1024,
&TableRouteValue::Physical(PhysicalTableRouteValue::new(vec![
// Already migrated to the to_peer.
RegionRoute {
region: Region::new_test(RegionId::new(1024, 1)),
leader_peer: Some(Peer::empty(2)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
// Leader peer changed.
RegionRoute {
region: Region::new_test(RegionId::new(1024, 2)),
leader_peer: Some(Peer::empty(3)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
// Peer conflict.
RegionRoute {
region: Region::new_test(RegionId::new(1024, 3)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![Peer::empty(2)],
leader_state: None,
leader_down_since: None,
},
// Normal case.
RegionRoute {
region: Region::new_test(RegionId::new(1024, 4)),
leader_peer: Some(Peer::empty(1)),
follower_peers: vec![],
leader_state: None,
leader_down_since: None,
},
])),
)
.unwrap();
kv_backend.txn(txn).await.unwrap();
let task = &RegionMigrationTaskBatch {
region_ids: vec![
RegionId::new(1024, 1),
RegionId::new(1024, 2),
RegionId::new(1024, 3),
RegionId::new(1024, 4),
RegionId::new(1025, 1),
],
from_peer: Peer::empty(1),
to_peer: Peer::empty(2),
timeout: Duration::from_millis(1000),
trigger_reason: RegionMigrationTriggerReason::Manual,
};
let result = analyze_region_migration_task(task, &table_metadata_manager)
.await
.unwrap();
assert_eq!(
result,
RegionMigrationAnalysis {
pending: vec![RegionId::new(1024, 4)],
migrated: vec![RegionId::new(1024, 1)],
leader_changed: vec![RegionId::new(1024, 2)],
peer_conflict: vec![RegionId::new(1024, 3)],
table_not_found: vec![RegionId::new(1025, 1)],
}
);
}
}

View File

@@ -32,7 +32,6 @@ use common_meta::rpc::store::RangeRequest;
use common_runtime::JoinHandle;
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound};
use futures::{StreamExt, TryStreamExt};
use snafu::{ResultExt, ensure};
use store_api::storage::RegionId;
@@ -45,8 +44,9 @@ use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
use crate::procedure::region_migration::manager::{
RegionMigrationManagerRef, RegionMigrationTriggerReason,
RegionMigrationManagerRef, RegionMigrationTriggerReason, SubmitRegionMigrationTaskResult,
};
use crate::procedure::region_migration::utils::RegionMigrationTaskBatch;
use crate::procedure::region_migration::{
DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
};
@@ -575,11 +575,22 @@ impl RegionSupervisor {
.await
{
Ok(tasks) => {
let mut grouped_tasks: HashMap<(u64, u64), Vec<_>> = HashMap::new();
for (task, count) in tasks {
let region_id = task.region_id;
let datanode_id = task.from_peer.id;
if let Err(err) = self.do_failover(task, count).await {
error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
grouped_tasks
.entry((task.from_peer.id, task.to_peer.id))
.or_default()
.push((task, count));
}
for ((from_peer_id, to_peer_id), tasks) in grouped_tasks {
if tasks.is_empty() {
continue;
}
let task = RegionMigrationTaskBatch::from_tasks(tasks);
let region_ids = task.region_ids.clone();
if let Err(err) = self.do_failover_tasks(task).await {
error!(err; "Failed to execute region failover for regions: {:?}, from_peer: {}, to_peer: {}", region_ids, from_peer_id, to_peer_id);
}
}
}
@@ -688,56 +699,92 @@ impl RegionSupervisor {
Ok(tasks)
}
async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
async fn do_failover_tasks(&mut self, task: RegionMigrationTaskBatch) -> Result<()> {
let from_peer_id = task.from_peer.id;
let to_peer_id = task.to_peer.id;
let region_id = task.region_id;
let timeout = task.timeout;
let trigger_reason = task.trigger_reason;
let result = self
.region_migration_manager
.submit_region_migration_task(task)
.await?;
self.handle_submit_region_migration_task_result(
from_peer_id,
to_peer_id,
timeout,
trigger_reason,
result,
)
.await
}
info!(
"Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
task.region_id, task.from_peer, task.to_peer, task.timeout, count
);
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
return match err {
RegionMigrated { .. } => {
info!(
"Region has been migrated to target peer: {}, removed failover detector for region: {}, datanode: {}",
to_peer_id, region_id, from_peer_id
);
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
Ok(())
}
// Returns Ok if it's running or table is dropped.
MigrationRunning { .. } => {
info!(
"Another region migration is running, skip failover for region: {}, datanode: {}",
region_id, from_peer_id
);
Ok(())
}
TableRouteNotFound { .. } => {
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
info!(
"Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
region_id, from_peer_id
);
Ok(())
}
LeaderPeerChanged { .. } => {
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
.await;
info!(
"Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
region_id, from_peer_id
);
Ok(())
}
err => Err(err),
};
};
async fn handle_submit_region_migration_task_result(
&mut self,
from_peer_id: DatanodeId,
to_peer_id: DatanodeId,
timeout: Duration,
trigger_reason: RegionMigrationTriggerReason,
result: SubmitRegionMigrationTaskResult,
) -> Result<()> {
if !result.migrated.is_empty() {
let detecting_regions = result
.migrated
.iter()
.map(|region_id| (from_peer_id, *region_id))
.collect::<Vec<_>>();
self.deregister_failure_detectors(detecting_regions).await;
info!(
"Region has been migrated to target peer: {}, removed failover detectors for regions: {:?}",
to_peer_id, result.migrated,
)
}
if !result.migrating.is_empty() {
info!(
"Region is still migrating, skipping failover for regions: {:?}",
result.migrating
);
}
if !result.table_not_found.is_empty() {
let detecting_regions = result
.table_not_found
.iter()
.map(|region_id| (from_peer_id, *region_id))
.collect::<Vec<_>>();
self.deregister_failure_detectors(detecting_regions).await;
info!(
"Table is not found, removed failover detectors for regions: {:?}",
result.table_not_found
);
}
if !result.leader_changed.is_empty() {
let detecting_regions = result
.leader_changed
.iter()
.map(|region_id| (from_peer_id, *region_id))
.collect::<Vec<_>>();
self.deregister_failure_detectors(detecting_regions).await;
info!(
"Region's leader peer changed, removed failover detectors for regions: {:?}",
result.leader_changed
);
}
if !result.peer_conflict.is_empty() {
info!(
"Region has peer conflict, ignore failover for regions: {:?}",
result.peer_conflict
);
}
if !result.submitted.is_empty() {
info!(
"Failover for regions: {:?}, from_peer: {}, to_peer: {}, procedure_id: {:?}, timeout: {:?}, trigger_reason: {:?}",
result.submitted,
from_peer_id,
to_peer_id,
result.procedure_id,
timeout,
trigger_reason,
);
}
Ok(())
}
@@ -813,7 +860,10 @@ pub(crate) mod tests {
use tokio::time::sleep;
use super::RegionSupervisorSelector;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::RegionMigrationTriggerReason;
use crate::procedure::region_migration::manager::{
RegionMigrationManager, SubmitRegionMigrationTaskResult,
};
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::region::supervisor::{
DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
@@ -1087,4 +1137,172 @@ pub(crate) mod tests {
sender.send(Event::Dump(tx)).await.unwrap();
assert!(rx.await.unwrap().is_empty());
}
#[tokio::test]
async fn test_handle_submit_region_migration_task_result_migrated() {
common_telemetry::init_default_ut_logging();
let (mut supervisor, _) = new_test_supervisor();
let region_id = RegionId::new(1, 1);
let detecting_region = (1, region_id);
supervisor
.register_failure_detectors(vec![detecting_region])
.await;
supervisor.failover_counts.insert(detecting_region, 1);
let result = SubmitRegionMigrationTaskResult {
migrated: vec![region_id],
..Default::default()
};
supervisor
.handle_submit_region_migration_task_result(
1,
2,
Duration::from_millis(1000),
RegionMigrationTriggerReason::Manual,
result,
)
.await
.unwrap();
assert!(!supervisor.failure_detector.contains(&detecting_region));
assert!(supervisor.failover_counts.is_empty());
}
#[tokio::test]
async fn test_handle_submit_region_migration_task_result_migrating() {
common_telemetry::init_default_ut_logging();
let (mut supervisor, _) = new_test_supervisor();
let region_id = RegionId::new(1, 1);
let detecting_region = (1, region_id);
supervisor
.register_failure_detectors(vec![detecting_region])
.await;
supervisor.failover_counts.insert(detecting_region, 1);
let result = SubmitRegionMigrationTaskResult {
migrating: vec![region_id],
..Default::default()
};
supervisor
.handle_submit_region_migration_task_result(
1,
2,
Duration::from_millis(1000),
RegionMigrationTriggerReason::Manual,
result,
)
.await
.unwrap();
assert!(supervisor.failure_detector.contains(&detecting_region));
assert!(supervisor.failover_counts.contains_key(&detecting_region));
}
#[tokio::test]
async fn test_handle_submit_region_migration_task_result_table_not_found() {
common_telemetry::init_default_ut_logging();
let (mut supervisor, _) = new_test_supervisor();
let region_id = RegionId::new(1, 1);
let detecting_region = (1, region_id);
supervisor
.register_failure_detectors(vec![detecting_region])
.await;
supervisor.failover_counts.insert(detecting_region, 1);
let result = SubmitRegionMigrationTaskResult {
table_not_found: vec![region_id],
..Default::default()
};
supervisor
.handle_submit_region_migration_task_result(
1,
2,
Duration::from_millis(1000),
RegionMigrationTriggerReason::Manual,
result,
)
.await
.unwrap();
assert!(!supervisor.failure_detector.contains(&detecting_region));
assert!(supervisor.failover_counts.is_empty());
}
#[tokio::test]
async fn test_handle_submit_region_migration_task_result_leader_changed() {
common_telemetry::init_default_ut_logging();
let (mut supervisor, _) = new_test_supervisor();
let region_id = RegionId::new(1, 1);
let detecting_region = (1, region_id);
supervisor
.register_failure_detectors(vec![detecting_region])
.await;
supervisor.failover_counts.insert(detecting_region, 1);
let result = SubmitRegionMigrationTaskResult {
leader_changed: vec![region_id],
..Default::default()
};
supervisor
.handle_submit_region_migration_task_result(
1,
2,
Duration::from_millis(1000),
RegionMigrationTriggerReason::Manual,
result,
)
.await
.unwrap();
assert!(!supervisor.failure_detector.contains(&detecting_region));
assert!(supervisor.failover_counts.is_empty());
}
#[tokio::test]
async fn test_handle_submit_region_migration_task_result_peer_conflict() {
common_telemetry::init_default_ut_logging();
let (mut supervisor, _) = new_test_supervisor();
let region_id = RegionId::new(1, 1);
let detecting_region = (1, region_id);
supervisor
.register_failure_detectors(vec![detecting_region])
.await;
supervisor.failover_counts.insert(detecting_region, 1);
let result = SubmitRegionMigrationTaskResult {
peer_conflict: vec![region_id],
..Default::default()
};
supervisor
.handle_submit_region_migration_task_result(
1,
2,
Duration::from_millis(1000),
RegionMigrationTriggerReason::Manual,
result,
)
.await
.unwrap();
assert!(supervisor.failure_detector.contains(&detecting_region));
assert!(supervisor.failover_counts.contains_key(&detecting_region));
}
#[tokio::test]
async fn test_handle_submit_region_migration_task_result_submitted() {
common_telemetry::init_default_ut_logging();
let (mut supervisor, _) = new_test_supervisor();
let region_id = RegionId::new(1, 1);
let detecting_region = (1, region_id);
supervisor
.register_failure_detectors(vec![detecting_region])
.await;
supervisor.failover_counts.insert(detecting_region, 1);
let result = SubmitRegionMigrationTaskResult {
submitted: vec![region_id],
..Default::default()
};
supervisor
.handle_submit_region_migration_task_result(
1,
2,
Duration::from_millis(1000),
RegionMigrationTriggerReason::Manual,
result,
)
.await
.unwrap();
assert!(supervisor.failure_detector.contains(&detecting_region));
assert!(supervisor.failover_counts.contains_key(&detecting_region));
}
}