mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 06:12:55 +00:00
feat: prevent migrating a leader region to a peer that already has a region follower (#5923)
* feat: prevent migrating a leader region to a peer that already has a region follower * chore: refine err msg
This commit is contained in:
@@ -83,6 +83,7 @@ chrono.workspace = true
|
||||
client = { workspace = true, features = ["testing"] }
|
||||
common-meta = { workspace = true, features = ["testing"] }
|
||||
common-procedure-test.workspace = true
|
||||
common-wal = { workspace = true, features = ["testing"] }
|
||||
session.workspace = true
|
||||
tracing = "0.1"
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
@@ -268,13 +268,35 @@ impl RegionMigrationManager {
|
||||
ensure!(
|
||||
leader_peer.id == task.from_peer.id,
|
||||
error::InvalidArgumentsSnafu {
|
||||
err_msg: "Invalid region migration `from_peer` argument"
|
||||
err_msg: format!(
|
||||
"Region's leader peer({}) is not the `from_peer`({}), region: {}",
|
||||
leader_peer.id, task.from_peer.id, task.region_id
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Throws an error if `to_peer` is already has a region follower.
|
||||
fn verify_region_follower_peers(
|
||||
&self,
|
||||
region_route: &RegionRoute,
|
||||
task: &RegionMigrationProcedureTask,
|
||||
) -> Result<()> {
|
||||
ensure!(
|
||||
!region_route.follower_peers.contains(&task.to_peer),
|
||||
error::InvalidArgumentsSnafu {
|
||||
err_msg: format!(
|
||||
"The `to_peer`({}) is already has a region follower, region: {}",
|
||||
task.to_peer.id, task.region_id
|
||||
),
|
||||
},
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Submits a new region migration procedure.
|
||||
pub async fn submit_procedure(
|
||||
&self,
|
||||
@@ -308,7 +330,7 @@ impl RegionMigrationManager {
|
||||
}
|
||||
|
||||
self.verify_region_leader_peer(®ion_route, &task)?;
|
||||
|
||||
self.verify_region_follower_peers(®ion_route, &task)?;
|
||||
let table_info = self.retrieve_table_info(region_id).await?;
|
||||
let TableName {
|
||||
catalog_name,
|
||||
@@ -486,9 +508,39 @@ mod test {
|
||||
|
||||
let err = manager.submit_procedure(task).await.unwrap_err();
|
||||
assert_matches!(err, error::Error::InvalidArguments { .. });
|
||||
assert!(err
|
||||
.to_string()
|
||||
.contains("Invalid region migration `from_peer` argument"));
|
||||
assert_eq!(err.to_string(), "Invalid arguments: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_submit_procedure_region_follower_on_to_peer() {
|
||||
let env = TestingEnv::new();
|
||||
let context_factory = env.context_factory();
|
||||
let manager = RegionMigrationManager::new(env.procedure_manager().clone(), context_factory);
|
||||
let region_id = RegionId::new(1024, 1);
|
||||
let task = RegionMigrationProcedureTask {
|
||||
region_id,
|
||||
from_peer: Peer::empty(3),
|
||||
to_peer: Peer::empty(2),
|
||||
timeout: Duration::from_millis(1000),
|
||||
};
|
||||
|
||||
let table_info = new_test_table_info(1024, vec![1]).into();
|
||||
let region_routes = vec![RegionRoute {
|
||||
region: Region::new_test(region_id),
|
||||
leader_peer: Some(Peer::empty(3)),
|
||||
follower_peers: vec![Peer::empty(2)],
|
||||
..Default::default()
|
||||
}];
|
||||
|
||||
env.create_physical_table_metadata(table_info, region_routes)
|
||||
.await;
|
||||
|
||||
let err = manager.submit_procedure(task).await.unwrap_err();
|
||||
assert_matches!(err, error::Error::InvalidArguments { .. });
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
"Invalid arguments: The `to_peer`(2) is already has a region follower, region: 4398046511105(1024, 1)"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
Reference in New Issue
Block a user