diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 6c9111dd9c..d659fa7d35 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -48,6 +48,7 @@ use serde::{Deserialize, Serialize}; use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use table::metadata::TableId; use tokio::sync::broadcast::error::RecvError; @@ -65,7 +66,7 @@ use crate::procedure::wal_prune::manager::WalPruneTickerRef; use crate::procedure::ProcedureManagerListenerAdapter; use crate::pubsub::{PublisherRef, SubscriptionManagerRef}; use crate::region::supervisor::RegionSupervisorTickerRef; -use crate::selector::{Selector, SelectorType}; +use crate::selector::{RegionStatAwareSelector, Selector, SelectorType}; use crate::service::mailbox::MailboxRef; use crate::service::store::cached_kv::LeaderCachedKvBackend; use crate::state::{become_follower, become_leader, StateRef}; @@ -338,6 +339,8 @@ pub struct SelectorContext { } pub type SelectorRef = Arc>>; +pub type RegionStatAwareSelectorRef = + Arc>>; pub type ElectionRef = Arc>; pub struct MetaStateHandler { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 0c93e4e4c7..dbc22ca2f7 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -40,7 +40,7 @@ use common_meta::state_store::KvStateStore; use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; -use common_telemetry::warn; +use common_telemetry::{info, warn}; use snafu::{ensure, ResultExt}; use crate::cache_invalidator::MetasrvCacheInvalidator; @@ -54,16 +54,16 @@ use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, Regi use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers}; use crate::lease::MetaPeerLookupService; use crate::metasrv::{ - ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectTarget, SelectorContext, SelectorRef, - FLOW_ID_SEQ, TABLE_ID_SEQ, + ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget, + SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ, }; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::DefaultContextFactory; use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker}; use crate::procedure::wal_prune::Context as WalPruneContext; use crate::region::supervisor::{ - HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker, - DEFAULT_TICK_INTERVAL, + HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector, + RegionSupervisorTicker, DEFAULT_TICK_INTERVAL, }; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::round_robin::RoundRobinSelector; @@ -320,13 +320,24 @@ impl MetasrvBuilder { ), )); region_migration_manager.try_start()?; + let region_supervisor_selector = plugins + .as_ref() + .and_then(|plugins| plugins.get::()); + + let supervisor_selector = match region_supervisor_selector { + Some(selector) => { + info!("Using region stat aware selector"); + RegionSupervisorSelector::RegionStatAwareSelector(selector) + } + None => RegionSupervisorSelector::NaiveSelector(selector.clone()), + }; let region_failover_handler = if options.enable_region_failover { let region_supervisor = RegionSupervisor::new( rx, options.failure_detector, selector_ctx.clone(), - selector.clone(), + supervisor_selector, region_migration_manager.clone(), maintenance_mode_manager.clone(), peer_lookup_service.clone(), diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 42963fb1fd..26be72fa5e 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -22,20 +22,20 @@ use common_meta::datanode::Stat; use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController}; use common_meta::key::maintenance::MaintenanceModeManagerRef; use common_meta::leadership_notifier::LeadershipChangeListener; -use common_meta::peer::PeerLookupServiceRef; +use common_meta::peer::{Peer, PeerLookupServiceRef}; use common_meta::DatanodeId; use common_runtime::JoinHandle; use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::time::{interval, MissedTickBehavior}; use crate::error::{self, Result}; use crate::failure_detector::PhiAccrualFailureDetectorOptions; -use crate::metasrv::{SelectorContext, SelectorRef}; +use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef}; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; use crate::procedure::region_migration::{ RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT, @@ -203,6 +203,12 @@ pub type RegionSupervisorRef = Arc; /// The default tick interval. pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1); +/// Selector for region supervisor. +pub enum RegionSupervisorSelector { + NaiveSelector(SelectorRef), + RegionStatAwareSelector(RegionStatAwareSelectorRef), +} + /// The [`RegionSupervisor`] is used to detect Region failures /// and initiate Region failover upon detection, ensuring uninterrupted region service. pub struct RegionSupervisor { @@ -215,7 +221,7 @@ pub struct RegionSupervisor { /// The context of [`SelectorRef`] selector_context: SelectorContext, /// Candidate node selector. - selector: SelectorRef, + selector: RegionSupervisorSelector, /// Region migration manager. region_migration_manager: RegionMigrationManagerRef, /// The maintenance mode manager. @@ -288,7 +294,7 @@ impl RegionSupervisor { event_receiver: Receiver, options: PhiAccrualFailureDetectorOptions, selector_context: SelectorContext, - selector: SelectorRef, + selector: RegionSupervisorSelector, region_migration_manager: RegionMigrationManagerRef, maintenance_mode_manager: MaintenanceModeManagerRef, peer_lookup: PeerLookupServiceRef, @@ -362,6 +368,7 @@ impl RegionSupervisor { } } + // Extracts regions that are migrating(failover), which means they are already being triggered failover. let migrating_regions = regions .extract_if(.., |(_, region_id)| { self.region_migration_manager.tracker().contains(*region_id) @@ -374,10 +381,40 @@ impl RegionSupervisor { ); } - warn!("Detects region failures: {:?}", regions); + if regions.is_empty() { + // If all detected regions are failover or migrating, just return. + return; + } + + let mut grouped_regions: HashMap> = + HashMap::with_capacity(regions.len()); for (datanode_id, region_id) in regions { - if let Err(err) = self.do_failover(datanode_id, region_id).await { - error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}"); + grouped_regions + .entry(datanode_id) + .or_default() + .push(region_id); + } + + let failed_datanodes = grouped_regions.keys().cloned().collect::>(); + for (datanode_id, regions) in grouped_regions { + warn!( + "Detects region failures on datanode: {}, regions: {:?}", + datanode_id, regions + ); + match self + .generate_failover_tasks(datanode_id, ®ions, &failed_datanodes) + .await + { + Ok(tasks) => { + for (task, count) in tasks { + let region_id = task.region_id; + let datanode_id = task.from_peer.id; + if let Err(err) = self.do_failover(task, count).await { + error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id); + } + } + } + Err(err) => error!(err; "Failed to generate failover tasks"), } } } @@ -389,49 +426,107 @@ impl RegionSupervisor { .context(error::MaintenanceModeManagerSnafu) } - async fn do_failover(&mut self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> { - let count = *self - .failover_counts - .entry((datanode_id, region_id)) - .and_modify(|count| *count += 1) - .or_insert(1); + async fn select_peers( + &self, + from_peer_id: DatanodeId, + regions: &[RegionId], + failure_datanodes: &[DatanodeId], + ) -> Result> { + let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned()); + match &self.selector { + RegionSupervisorSelector::NaiveSelector(selector) => { + let opt = SelectorOptions { + min_required_items: regions.len(), + allow_duplication: true, + exclude_peer_ids, + }; + let peers = selector.select(&self.selector_context, opt).await?; + ensure!( + peers.len() == regions.len(), + error::NoEnoughAvailableNodeSnafu { + required: regions.len(), + available: peers.len(), + select_target: SelectTarget::Datanode, + } + ); + let region_peers = regions + .iter() + .zip(peers) + .map(|(region_id, peer)| (*region_id, peer)) + .collect::>(); + + Ok(region_peers) + } + RegionSupervisorSelector::RegionStatAwareSelector(selector) => { + let peers = selector + .select( + &self.selector_context, + from_peer_id, + regions, + exclude_peer_ids, + ) + .await?; + ensure!( + peers.len() == regions.len(), + error::NoEnoughAvailableNodeSnafu { + required: regions.len(), + available: peers.len(), + select_target: SelectTarget::Datanode, + } + ); + + Ok(peers) + } + } + } + + async fn generate_failover_tasks( + &mut self, + from_peer_id: DatanodeId, + regions: &[RegionId], + failed_datanodes: &[DatanodeId], + ) -> Result> { + let mut tasks = Vec::with_capacity(regions.len()); let from_peer = self .peer_lookup - .datanode(datanode_id) + .datanode(from_peer_id) .await .context(error::LookupPeerSnafu { - peer_id: datanode_id, + peer_id: from_peer_id, })? .context(error::PeerUnavailableSnafu { - peer_id: datanode_id, + peer_id: from_peer_id, })?; - let mut peers = self - .selector - .select( - &self.selector_context, - SelectorOptions { - min_required_items: 1, - allow_duplication: false, - exclude_peer_ids: HashSet::from([from_peer.id]), - }, - ) + let region_peers = self + .select_peers(from_peer_id, regions, failed_datanodes) .await?; - let to_peer = peers.remove(0); - if to_peer.id == from_peer.id { - warn!( - "Skip failover for region: {region_id}, from_peer: {from_peer}, trying to failover to the same peer." - ); - return Ok(()); + + for (region_id, peer) in region_peers { + let count = *self + .failover_counts + .entry((from_peer_id, region_id)) + .and_modify(|count| *count += 1) + .or_insert(1); + let task = RegionMigrationProcedureTask { + region_id, + from_peer: from_peer.clone(), + to_peer: peer, + timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count, + }; + tasks.push((task, count)); } + + Ok(tasks) + } + + async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> { + let from_peer_id = task.from_peer.id; + let region_id = task.region_id; + info!( - "Failover for region: {region_id}, from_peer: {from_peer}, to_peer: {to_peer}, tries: {count}" + "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}", + task.region_id, task.from_peer, task.to_peer, task.timeout, count ); - let task = RegionMigrationProcedureTask { - region_id, - from_peer, - to_peer, - timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count, - }; if let Err(err) = self.region_migration_manager.submit_procedure(task).await { return match err { @@ -439,25 +534,25 @@ impl RegionSupervisor { MigrationRunning { .. } => { info!( "Another region migration is running, skip failover for region: {}, datanode: {}", - region_id, datanode_id + region_id, from_peer_id ); Ok(()) } TableRouteNotFound { .. } => { - self.deregister_failure_detectors(vec![(datanode_id, region_id)]) + self.deregister_failure_detectors(vec![(from_peer_id, region_id)]) .await; info!( "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}", - region_id, datanode_id + region_id, from_peer_id ); Ok(()) } LeaderPeerChanged { .. } => { - self.deregister_failure_detectors(vec![(datanode_id, region_id)]) + self.deregister_failure_detectors(vec![(from_peer_id, region_id)]) .await; info!( "Region's leader peer changed, removed failover detector for region: {}, datanode: {}", - region_id, datanode_id + region_id, from_peer_id ); Ok(()) } @@ -521,6 +616,7 @@ pub(crate) mod tests { use tokio::sync::oneshot; use tokio::time::sleep; + use super::RegionSupervisorSelector; use crate::procedure::region_migration::manager::RegionMigrationManager; use crate::procedure::region_migration::test_util::TestingEnv; use crate::region::supervisor::{ @@ -548,7 +644,7 @@ pub(crate) mod tests { rx, Default::default(), selector_context, - selector, + RegionSupervisorSelector::NaiveSelector(selector), region_migration_manager, maintenance_mode_manager, peer_lookup, diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 96fbda241d..92e55a2149 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -23,6 +23,7 @@ pub mod weighted_choose; use std::collections::HashSet; use serde::{Deserialize, Serialize}; +use store_api::storage::RegionId; use strum::AsRefStr; use crate::error; @@ -36,6 +37,24 @@ pub trait Selector: Send + Sync { async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result; } +/// A selector that aware of region statistics +/// +/// It selects the best destination peer for a list of regions. +/// The selection is based on the region statistics, such as the region leader's write throughput. +#[async_trait::async_trait] +pub trait RegionStatAwareSelector: Send + Sync { + type Context; + type Output; + + async fn select( + &self, + ctx: &Self::Context, + from_peer_id: u64, + region_ids: &[RegionId], + exclude_peer_ids: HashSet, + ) -> Result; +} + #[derive(Debug)] pub struct SelectorOptions { /// Minimum number of selected results.