From 0a4594c9e20aed8cdf0438422d075994c3a657dd Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 22 Apr 2025 14:15:47 +0800 Subject: [PATCH] fix: remove obsolete failover detectors after region leader change (#5944) * fix: remove obsolete failover detectors after region leader change * chore: apply suggestions from CR * fix: fix unit tests * fix: fix unit test * fix: failover logic --- src/meta-srv/src/error.rs | 10 +++++- .../src/procedure/region_migration/manager.rs | 8 ++--- src/meta-srv/src/region/supervisor.rs | 36 +++++++++++++++---- tests-integration/tests/region_migration.rs | 2 +- 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index bd249c5178..d6c4ef4208 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -336,6 +336,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Region's leader peer changed: {}", msg))] + LeaderPeerChanged { + msg: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid arguments: {}", err_msg))] InvalidArguments { err_msg: String, @@ -914,7 +921,8 @@ impl ErrorExt for Error { | Error::ProcedureNotFound { .. } | Error::TooManyPartitions { .. } | Error::TomlFormat { .. } - | Error::HandlerNotFound { .. } => StatusCode::InvalidArguments, + | Error::HandlerNotFound { .. } + | Error::LeaderPeerChanged { .. } => StatusCode::InvalidArguments, Error::LeaseKeyFromUtf8 { .. } | Error::LeaseValueFromUtf8 { .. } | Error::InvalidRegionKeyFromUtf8 { .. } diff --git a/src/meta-srv/src/procedure/region_migration/manager.rs b/src/meta-srv/src/procedure/region_migration/manager.rs index 417881f549..9286bb8778 100644 --- a/src/meta-srv/src/procedure/region_migration/manager.rs +++ b/src/meta-srv/src/procedure/region_migration/manager.rs @@ -267,8 +267,8 @@ impl RegionMigrationManager { ensure!( leader_peer.id == task.from_peer.id, - error::InvalidArgumentsSnafu { - err_msg: format!( + error::LeaderPeerChangedSnafu { + msg: format!( "Region's leader peer({}) is not the `from_peer`({}), region: {}", leader_peer.id, task.from_peer.id, task.region_id ), @@ -507,8 +507,8 @@ mod test { .await; let err = manager.submit_procedure(task).await.unwrap_err(); - assert_matches!(err, error::Error::InvalidArguments { .. }); - assert_eq!(err.to_string(), "Invalid arguments: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)"); + assert_matches!(err, error::Error::LeaderPeerChanged { .. }); + assert_eq!(err.to_string(), "Region's leader peer changed: Region's leader peer(3) is not the `from_peer`(1), region: 4398046511105(1024, 1)"); } #[tokio::test] diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 9393474f16..f37face68f 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -26,7 +26,7 @@ use common_meta::DatanodeId; use common_runtime::JoinHandle; use common_telemetry::{error, info, warn}; use common_time::util::current_time_millis; -use error::Error::{MigrationRunning, TableRouteNotFound}; +use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound}; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; @@ -363,15 +363,15 @@ impl RegionSupervisor { for (datanode_id, region_id) in migrating_regions { self.failure_detector.remove(&(datanode_id, region_id)); + warn!( + "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating" + ); } warn!("Detects region failures: {:?}", regions); for (datanode_id, region_id) in regions { - match self.do_failover(datanode_id, region_id).await { - Ok(_) => self.failure_detector.remove(&(datanode_id, region_id)), - Err(err) => { - error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}"); - } + if let Err(err) = self.do_failover(datanode_id, region_id).await { + error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}"); } } } @@ -421,7 +421,29 @@ impl RegionSupervisor { if let Err(err) = self.region_migration_manager.submit_procedure(task).await { return match err { // Returns Ok if it's running or table is dropped. - MigrationRunning { .. } | TableRouteNotFound { .. } => Ok(()), + MigrationRunning { .. } => { + info!( + "Another region migration is running, skip failover for region: {}, datanode: {}", + region_id, datanode_id + ); + Ok(()) + } + TableRouteNotFound { .. } => { + self.failure_detector.remove(&(datanode_id, region_id)); + info!( + "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}", + region_id, datanode_id + ); + Ok(()) + } + LeaderPeerChanged { .. } => { + self.failure_detector.remove(&(datanode_id, region_id)); + info!( + "Region's leader peer changed, removed failover detector for region: {}, datanode: {}", + region_id, datanode_id + ); + Ok(()) + } err => Err(err), }; }; diff --git a/tests-integration/tests/region_migration.rs b/tests-integration/tests/region_migration.rs index b24850420e..28d6371c8b 100644 --- a/tests-integration/tests/region_migration.rs +++ b/tests-integration/tests/region_migration.rs @@ -847,7 +847,7 @@ pub async fn test_region_migration_incorrect_from_peer( assert!(matches!( err, - meta_srv::error::Error::InvalidArguments { .. } + meta_srv::error::Error::LeaderPeerChanged { .. } )); }