From f3d000f6ecffcd7161b2b3bfd2715da93c601b81 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 24 Apr 2025 02:19:18 +0800 Subject: [PATCH] feat: track region failover attempts and adjust timeout (#5952) --- src/meta-srv/src/region/supervisor.rs | 35 ++++++++++++++++++--------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 6318c0d128..42963fb1fd 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -25,7 +25,7 @@ use common_meta::leadership_notifier::LeadershipChangeListener; use common_meta::peer::PeerLookupServiceRef; use common_meta::DatanodeId; use common_runtime::JoinHandle; -use common_telemetry::{error, info, warn}; +use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound}; use snafu::{OptionExt, ResultExt}; @@ -208,6 +208,8 @@ pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1); pub struct RegionSupervisor { /// Used to detect the failure of regions. failure_detector: RegionFailureDetector, + /// Tracks the number of failovers for each region. + failover_counts: HashMap, /// Receives [Event]s. receiver: Receiver, /// The context of [`SelectorRef`] @@ -293,6 +295,7 @@ impl RegionSupervisor { ) -> Self { Self { failure_detector: RegionFailureDetector::new(options), + failover_counts: HashMap::new(), receiver: event_receiver, selector_context, selector, @@ -336,13 +339,14 @@ impl RegionSupervisor { } } - async fn deregister_failure_detectors(&self, detecting_regions: Vec) { + async fn deregister_failure_detectors(&mut self, detecting_regions: Vec) { for region in detecting_regions { - self.failure_detector.remove(®ion) + self.failure_detector.remove(®ion); + self.failover_counts.remove(®ion); } } - async fn handle_region_failures(&self, mut regions: Vec<(DatanodeId, RegionId)>) { + async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) { if regions.is_empty() { return; } @@ -365,8 +369,7 @@ impl RegionSupervisor { .collect::>(); for (datanode_id, region_id) in migrating_regions { - self.failure_detector.remove(&(datanode_id, region_id)); - warn!( + debug!( "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating" ); } @@ -386,7 +389,12 @@ impl RegionSupervisor { .context(error::MaintenanceModeManagerSnafu) } - async fn do_failover(&self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> { + async fn do_failover(&mut self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> { + let count = *self + .failover_counts + .entry((datanode_id, region_id)) + .and_modify(|count| *count += 1) + .or_insert(1); let from_peer = self .peer_lookup .datanode(datanode_id) @@ -415,11 +423,14 @@ impl RegionSupervisor { ); return Ok(()); } + info!( + "Failover for region: {region_id}, from_peer: {from_peer}, to_peer: {to_peer}, tries: {count}" + ); let task = RegionMigrationProcedureTask { region_id, from_peer, to_peer, - timeout: DEFAULT_REGION_MIGRATION_TIMEOUT, + timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count, }; if let Err(err) = self.region_migration_manager.submit_procedure(task).await { @@ -433,7 +444,8 @@ impl RegionSupervisor { Ok(()) } TableRouteNotFound { .. } => { - self.failure_detector.remove(&(datanode_id, region_id)); + self.deregister_failure_detectors(vec![(datanode_id, region_id)]) + .await; info!( "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}", region_id, datanode_id @@ -441,7 +453,8 @@ impl RegionSupervisor { Ok(()) } LeaderPeerChanged { .. } => { - self.failure_detector.remove(&(datanode_id, region_id)); + self.deregister_failure_detectors(vec![(datanode_id, region_id)]) + .await; info!( "Region's leader peer changed, removed failover detector for region: {}, datanode: {}", region_id, datanode_id