From 1e394af583366a6499dc662a355a0f373d060a7d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Fri, 18 Apr 2025 19:13:01 +0800 Subject: [PATCH] feat: prevent migrating a leader region to a peer that already has a region follower (#5923) * feat: prevent migrating a leader region to a peer that already has a region follower * chore: refine err msg --- src/meta-srv/Cargo.toml | 1 + .../src/procedure/region_migration/manager.rs | 62 +++++++++++++++++-- 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 15830a4f05..0e0f3e5cde 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -83,6 +83,7 @@ chrono.workspace = true client = { workspace = true, features = ["testing"] } common-meta = { workspace = true, features = ["testing"] } common-procedure-test.workspace = true +common-wal = { workspace = true, features = ["testing"] } session.workspace = true tracing = "0.1" tracing-subscriber.workspace = true diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index e2345559d0..417881f549 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -268,13 +268,35 @@ impl RegionMigrationManager { ensure!( leader_peer.id == task.from_peer.id, error::InvalidArgumentsSnafu { - err_msg: "Invalid region migration `from_peer` argument" + err_msg: format!( + "Region's leader peer({}) is not the `from_peer`({}), region: {}", + leader_peer.id, task.from_peer.id, task.region_id + ), } ); Ok(()) } + /// Throws an error if `to_peer` is already has a region follower. + fn verify_region_follower_peers( + &self, + region_route: &RegionRoute, + task: &RegionMigrationProcedureTask, + ) -> Result<()> { + ensure!( + !region_route.follower_peers.contains(&task.to_peer), + error::InvalidArgumentsSnafu { + err_msg: format!( + "The `to_peer`({}) is already has a region follower, region: {}", + task.to_peer.id, task.region_id + ), + }, + ); + + Ok(()) + } + /// Submits a new region migration procedure. pub async fn submit_procedure( &self, @@ -308,7 +330,7 @@ impl RegionMigrationManager { } self.verify_region_leader_peer(®ion_route, &task)?; - + self.verify_region_follower_peers(®ion_route, &task)?; let table_info = self.retrieve_table_info(region_id).await?; let TableName { catalog_name, @@ -486,9 +508,39 @@ mod test { let err = manager.submit_procedure(task).await.unwrap_err(); assert_matches!(err, error::Error::InvalidArguments { .. }); - assert!(err - .to_string() - .contains("Invalid region migration `from_peer` argument")); + assert_eq!(err.to_string(), "Invalid arguments: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)"); + } + + #[tokio::test] + async fn test_submit_procedure_region_follower_on_to_peer() { + let env = TestingEnv::new(); + let context_factory = env.context_factory(); + let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory); + let region_id = RegionId::new(1024, 1); + let task = RegionMigrationProcedureTask { + region_id, + from_peer: Peer::empty(3), + to_peer: Peer::empty(2), + timeout: Duration::from_millis(1000), + }; + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(3)), + follower_peers: vec![Peer::empty(2)], + ..Default::default() + }]; + + env.create_physical_table_metadata(table_info, region_routes) + .await; + + let err = manager.submit_procedure(task).await.unwrap_err(); + assert_matches!(err, error::Error::InvalidArguments { .. }); + assert_eq!( + err.to_string(), + "Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)" + ); } #[tokio::test]