fix: remove obsolete failover detectors after region leader change (#5944)

* fix: remove obsolete failover detectors after region leader change

* chore: apply suggestions from CR

* fix: fix unit tests

* fix: fix unit test

* fix: failover logic
This commit is contained in:
Weny Xu
2025-04-22 14:15:47 +08:00
committed by GitHub
parent d9437c6da7
commit 0a4594c9e2
4 changed files with 43 additions and 13 deletions

View File

@@ -336,6 +336,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Region's leader peer changed: {}", msg))]
LeaderPeerChanged {
msg: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid arguments: {}", err_msg))]
InvalidArguments {
err_msg: String,
@@ -914,7 +921,8 @@ impl ErrorExt for Error {
| Error::ProcedureNotFound { .. }
| Error::TooManyPartitions { .. }
| Error::TomlFormat { .. }
| Error::HandlerNotFound { .. } => StatusCode::InvalidArguments,
| Error::HandlerNotFound { .. }
| Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }
| Error::LeaseValueFromUtf8 { .. }
| Error::InvalidRegionKeyFromUtf8 { .. }

View File

@@ -267,8 +267,8 @@ impl RegionMigrationManager {
ensure!(
leader_peer.id == task.from_peer.id,
error::InvalidArgumentsSnafu {
err_msg: format!(
error::LeaderPeerChangedSnafu {
msg: format!(
"Region's leader peer({}) is not the `from_peer`({}), region: {}",
leader_peer.id, task.from_peer.id, task.region_id
),
@@ -507,8 +507,8 @@ mod test {
.await;
let err = manager.submit_procedure(task).await.unwrap_err();
assert_matches!(err, error::Error::InvalidArguments { .. });
assert_eq!(err.to_string(), "Invalid arguments: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
assert_matches!(err, error::Error::LeaderPeerChanged { .. });
assert_eq!(err.to_string(), "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)");
}
#[tokio::test]

View File

@@ -26,7 +26,7 @@ use common_meta::DatanodeId;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{MigrationRunning, TableRouteNotFound};
use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
@@ -363,15 +363,15 @@ impl RegionSupervisor {
for (datanode_id, region_id) in migrating_regions {
self.failure_detector.remove(&(datanode_id, region_id));
warn!(
"Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
);
}
warn!("Detects region failures: {:?}", regions);
for (datanode_id, region_id) in regions {
match self.do_failover(datanode_id, region_id).await {
Ok(_) => self.failure_detector.remove(&(datanode_id, region_id)),
Err(err) => {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
}
if let Err(err) = self.do_failover(datanode_id, region_id).await {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
}
}
}
@@ -421,7 +421,29 @@ impl RegionSupervisor {
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
return match err {
// Returns Ok if it's running or table is dropped.
MigrationRunning { .. } | TableRouteNotFound { .. } => Ok(()),
MigrationRunning { .. } => {
info!(
"Another region migration is running, skip failover for region: {}, datanode: {}",
region_id, datanode_id
);
Ok(())
}
TableRouteNotFound { .. } => {
self.failure_detector.remove(&(datanode_id, region_id));
info!(
"Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
);
Ok(())
}
LeaderPeerChanged { .. } => {
self.failure_detector.remove(&(datanode_id, region_id));
info!(
"Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
);
Ok(())
}
err => Err(err),
};
};

View File

@@ -847,7 +847,7 @@ pub async fn test_region_migration_incorrect_from_peer(
assert!(matches!(
err,
meta_srv::error::Error::InvalidArguments { .. }
meta_srv::error::Error::LeaderPeerChanged { .. }
));
}