feat: track region failover attempts and adjust timeout (#5952)

This commit is contained in:
Weny Xu
2025-04-24 02:19:18 +08:00
committed by GitHub
parent 9557b76224
commit f3d000f6ec

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -25,7 +25,7 @@ use common_meta::leadership_notifier::LeadershipChangeListener;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::DatanodeId;
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use common_telemetry::{debug, error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound};
use snafu::{OptionExt, ResultExt};
@@ -208,6 +208,8 @@ pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
pub struct RegionSupervisor {
/// Used to detect the failure of regions.
failure_detector: RegionFailureDetector,
/// Tracks the number of failovers for each region.
failover_counts: HashMap<DetectingRegion, u32>,
/// Receives [Event]s.
receiver: Receiver<Event>,
/// The context of [`SelectorRef`]
@@ -293,6 +295,7 @@ impl RegionSupervisor {
) -> Self {
Self {
failure_detector: RegionFailureDetector::new(options),
failover_counts: HashMap::new(),
receiver: event_receiver,
selector_context,
selector,
@@ -336,13 +339,14 @@ impl RegionSupervisor {
}
}
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
for region in detecting_regions {
self.failure_detector.remove(&region)
self.failure_detector.remove(&region);
self.failover_counts.remove(&region);
}
}
async fn handle_region_failures(&self, mut regions: Vec<(DatanodeId, RegionId)>) {
async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
if regions.is_empty() {
return;
}
@@ -365,8 +369,7 @@ impl RegionSupervisor {
.collect::<Vec<_>>();
for (datanode_id, region_id) in migrating_regions {
self.failure_detector.remove(&(datanode_id, region_id));
warn!(
debug!(
"Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
);
}
@@ -386,7 +389,12 @@ impl RegionSupervisor {
.context(error::MaintenanceModeManagerSnafu)
}
async fn do_failover(&self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> {
async fn do_failover(&mut self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> {
let count = *self
.failover_counts
.entry((datanode_id, region_id))
.and_modify(|count| *count += 1)
.or_insert(1);
let from_peer = self
.peer_lookup
.datanode(datanode_id)
@@ -415,11 +423,14 @@ impl RegionSupervisor {
);
return Ok(());
}
info!(
"Failover for region: {region_id}, from_peer: {from_peer}, to_peer: {to_peer}, tries: {count}"
);
let task = RegionMigrationProcedureTask {
region_id,
from_peer,
to_peer,
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT,
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
};
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
@@ -433,7 +444,8 @@ impl RegionSupervisor {
Ok(())
}
TableRouteNotFound { .. } => {
self.failure_detector.remove(&(datanode_id, region_id));
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
.await;
info!(
"Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id
@@ -441,7 +453,8 @@ impl RegionSupervisor {
Ok(())
}
LeaderPeerChanged { .. } => {
self.failure_detector.remove(&(datanode_id, region_id));
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
.await;
info!(
"Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
region_id, datanode_id