mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 20:02:54 +00:00
feat: introduce RegionStatAwareSelector trait (#5990)
* feat: introduce `RegionStatAwareSelector` * feat: exclude all failed datanodes * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: apply suggestions from CR * chore: apply suggestions from CR
This commit is contained in:
@@ -48,6 +48,7 @@ use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::http::HttpOptions;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::metadata::TableId;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
@@ -65,7 +66,7 @@ use crate::procedure::wal_prune::manager::WalPruneTickerRef;
|
||||
use crate::procedure::ProcedureManagerListenerAdapter;
|
||||
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
|
||||
use crate::region::supervisor::RegionSupervisorTickerRef;
|
||||
use crate::selector::{Selector, SelectorType};
|
||||
use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
use crate::service::store::cached_kv::LeaderCachedKvBackend;
|
||||
use crate::state::{become_follower, become_leader, StateRef};
|
||||
@@ -338,6 +339,8 @@ pub struct SelectorContext {
|
||||
}
|
||||
|
||||
pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
|
||||
pub type RegionStatAwareSelectorRef =
|
||||
Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
|
||||
pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
|
||||
|
||||
pub struct MetaStateHandler {
|
||||
|
||||
@@ -40,7 +40,7 @@ use common_meta::state_store::KvStateStore;
|
||||
use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_telemetry::warn;
|
||||
use common_telemetry::{info, warn};
|
||||
use snafu::{ensure, ResultExt};
|
||||
|
||||
use crate::cache_invalidator::MetasrvCacheInvalidator;
|
||||
@@ -54,16 +54,16 @@ use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, Regi
|
||||
use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
|
||||
use crate::lease::MetaPeerLookupService;
|
||||
use crate::metasrv::{
|
||||
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectTarget, SelectorContext, SelectorRef,
|
||||
FLOW_ID_SEQ, TABLE_ID_SEQ,
|
||||
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
|
||||
SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ,
|
||||
};
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManager;
|
||||
use crate::procedure::region_migration::DefaultContextFactory;
|
||||
use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
|
||||
use crate::procedure::wal_prune::Context as WalPruneContext;
|
||||
use crate::region::supervisor::{
|
||||
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
|
||||
DEFAULT_TICK_INTERVAL,
|
||||
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
|
||||
RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
|
||||
};
|
||||
use crate::selector::lease_based::LeaseBasedSelector;
|
||||
use crate::selector::round_robin::RoundRobinSelector;
|
||||
@@ -320,13 +320,24 @@ impl MetasrvBuilder {
|
||||
),
|
||||
));
|
||||
region_migration_manager.try_start()?;
|
||||
let region_supervisor_selector = plugins
|
||||
.as_ref()
|
||||
.and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
|
||||
|
||||
let supervisor_selector = match region_supervisor_selector {
|
||||
Some(selector) => {
|
||||
info!("Using region stat aware selector");
|
||||
RegionSupervisorSelector::RegionStatAwareSelector(selector)
|
||||
}
|
||||
None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
|
||||
};
|
||||
|
||||
let region_failover_handler = if options.enable_region_failover {
|
||||
let region_supervisor = RegionSupervisor::new(
|
||||
rx,
|
||||
options.failure_detector,
|
||||
selector_ctx.clone(),
|
||||
selector.clone(),
|
||||
supervisor_selector,
|
||||
region_migration_manager.clone(),
|
||||
maintenance_mode_manager.clone(),
|
||||
peer_lookup_service.clone(),
|
||||
|
||||
@@ -22,20 +22,20 @@ use common_meta::datanode::Stat;
|
||||
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
|
||||
use common_meta::key::maintenance::MaintenanceModeManagerRef;
|
||||
use common_meta::leadership_notifier::LeadershipChangeListener;
|
||||
use common_meta::peer::PeerLookupServiceRef;
|
||||
use common_meta::peer::{Peer, PeerLookupServiceRef};
|
||||
use common_meta::DatanodeId;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_time::util::current_time_millis;
|
||||
use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::time::{interval, MissedTickBehavior};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
|
||||
use crate::metasrv::{SelectorContext, SelectorRef};
|
||||
use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
|
||||
use crate::procedure::region_migration::{
|
||||
RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
|
||||
@@ -203,6 +203,12 @@ pub type RegionSupervisorRef = Arc<RegionSupervisor>;
|
||||
/// The default tick interval.
|
||||
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
|
||||
|
||||
/// Selector for region supervisor.
|
||||
pub enum RegionSupervisorSelector {
|
||||
NaiveSelector(SelectorRef),
|
||||
RegionStatAwareSelector(RegionStatAwareSelectorRef),
|
||||
}
|
||||
|
||||
/// The [`RegionSupervisor`] is used to detect Region failures
|
||||
/// and initiate Region failover upon detection, ensuring uninterrupted region service.
|
||||
pub struct RegionSupervisor {
|
||||
@@ -215,7 +221,7 @@ pub struct RegionSupervisor {
|
||||
/// The context of [`SelectorRef`]
|
||||
selector_context: SelectorContext,
|
||||
/// Candidate node selector.
|
||||
selector: SelectorRef,
|
||||
selector: RegionSupervisorSelector,
|
||||
/// Region migration manager.
|
||||
region_migration_manager: RegionMigrationManagerRef,
|
||||
/// The maintenance mode manager.
|
||||
@@ -288,7 +294,7 @@ impl RegionSupervisor {
|
||||
event_receiver: Receiver<Event>,
|
||||
options: PhiAccrualFailureDetectorOptions,
|
||||
selector_context: SelectorContext,
|
||||
selector: SelectorRef,
|
||||
selector: RegionSupervisorSelector,
|
||||
region_migration_manager: RegionMigrationManagerRef,
|
||||
maintenance_mode_manager: MaintenanceModeManagerRef,
|
||||
peer_lookup: PeerLookupServiceRef,
|
||||
@@ -362,6 +368,7 @@ impl RegionSupervisor {
|
||||
}
|
||||
}
|
||||
|
||||
// Extracts regions that are migrating(failover), which means they are already being triggered failover.
|
||||
let migrating_regions = regions
|
||||
.extract_if(.., |(_, region_id)| {
|
||||
self.region_migration_manager.tracker().contains(*region_id)
|
||||
@@ -374,10 +381,40 @@ impl RegionSupervisor {
|
||||
);
|
||||
}
|
||||
|
||||
warn!("Detects region failures: {:?}", regions);
|
||||
if regions.is_empty() {
|
||||
// If all detected regions are failover or migrating, just return.
|
||||
return;
|
||||
}
|
||||
|
||||
let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
|
||||
HashMap::with_capacity(regions.len());
|
||||
for (datanode_id, region_id) in regions {
|
||||
if let Err(err) = self.do_failover(datanode_id, region_id).await {
|
||||
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
|
||||
grouped_regions
|
||||
.entry(datanode_id)
|
||||
.or_default()
|
||||
.push(region_id);
|
||||
}
|
||||
|
||||
let failed_datanodes = grouped_regions.keys().cloned().collect::<Vec<_>>();
|
||||
for (datanode_id, regions) in grouped_regions {
|
||||
warn!(
|
||||
"Detects region failures on datanode: {}, regions: {:?}",
|
||||
datanode_id, regions
|
||||
);
|
||||
match self
|
||||
.generate_failover_tasks(datanode_id, ®ions, &failed_datanodes)
|
||||
.await
|
||||
{
|
||||
Ok(tasks) => {
|
||||
for (task, count) in tasks {
|
||||
let region_id = task.region_id;
|
||||
let datanode_id = task.from_peer.id;
|
||||
if let Err(err) = self.do_failover(task, count).await {
|
||||
error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => error!(err; "Failed to generate failover tasks"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -389,49 +426,107 @@ impl RegionSupervisor {
|
||||
.context(error::MaintenanceModeManagerSnafu)
|
||||
}
|
||||
|
||||
async fn do_failover(&mut self, datanode_id: DatanodeId, region_id: RegionId) -> Result<()> {
|
||||
let count = *self
|
||||
.failover_counts
|
||||
.entry((datanode_id, region_id))
|
||||
.and_modify(|count| *count += 1)
|
||||
.or_insert(1);
|
||||
async fn select_peers(
|
||||
&self,
|
||||
from_peer_id: DatanodeId,
|
||||
regions: &[RegionId],
|
||||
failure_datanodes: &[DatanodeId],
|
||||
) -> Result<Vec<(RegionId, Peer)>> {
|
||||
let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
|
||||
match &self.selector {
|
||||
RegionSupervisorSelector::NaiveSelector(selector) => {
|
||||
let opt = SelectorOptions {
|
||||
min_required_items: regions.len(),
|
||||
allow_duplication: true,
|
||||
exclude_peer_ids,
|
||||
};
|
||||
let peers = selector.select(&self.selector_context, opt).await?;
|
||||
ensure!(
|
||||
peers.len() == regions.len(),
|
||||
error::NoEnoughAvailableNodeSnafu {
|
||||
required: regions.len(),
|
||||
available: peers.len(),
|
||||
select_target: SelectTarget::Datanode,
|
||||
}
|
||||
);
|
||||
let region_peers = regions
|
||||
.iter()
|
||||
.zip(peers)
|
||||
.map(|(region_id, peer)| (*region_id, peer))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Ok(region_peers)
|
||||
}
|
||||
RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
|
||||
let peers = selector
|
||||
.select(
|
||||
&self.selector_context,
|
||||
from_peer_id,
|
||||
regions,
|
||||
exclude_peer_ids,
|
||||
)
|
||||
.await?;
|
||||
ensure!(
|
||||
peers.len() == regions.len(),
|
||||
error::NoEnoughAvailableNodeSnafu {
|
||||
required: regions.len(),
|
||||
available: peers.len(),
|
||||
select_target: SelectTarget::Datanode,
|
||||
}
|
||||
);
|
||||
|
||||
Ok(peers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn generate_failover_tasks(
|
||||
&mut self,
|
||||
from_peer_id: DatanodeId,
|
||||
regions: &[RegionId],
|
||||
failed_datanodes: &[DatanodeId],
|
||||
) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
|
||||
let mut tasks = Vec::with_capacity(regions.len());
|
||||
let from_peer = self
|
||||
.peer_lookup
|
||||
.datanode(datanode_id)
|
||||
.datanode(from_peer_id)
|
||||
.await
|
||||
.context(error::LookupPeerSnafu {
|
||||
peer_id: datanode_id,
|
||||
peer_id: from_peer_id,
|
||||
})?
|
||||
.context(error::PeerUnavailableSnafu {
|
||||
peer_id: datanode_id,
|
||||
peer_id: from_peer_id,
|
||||
})?;
|
||||
let mut peers = self
|
||||
.selector
|
||||
.select(
|
||||
&self.selector_context,
|
||||
SelectorOptions {
|
||||
min_required_items: 1,
|
||||
allow_duplication: false,
|
||||
exclude_peer_ids: HashSet::from([from_peer.id]),
|
||||
},
|
||||
)
|
||||
let region_peers = self
|
||||
.select_peers(from_peer_id, regions, failed_datanodes)
|
||||
.await?;
|
||||
let to_peer = peers.remove(0);
|
||||
if to_peer.id == from_peer.id {
|
||||
warn!(
|
||||
"Skip failover for region: {region_id}, from_peer: {from_peer}, trying to failover to the same peer."
|
||||
);
|
||||
return Ok(());
|
||||
|
||||
for (region_id, peer) in region_peers {
|
||||
let count = *self
|
||||
.failover_counts
|
||||
.entry((from_peer_id, region_id))
|
||||
.and_modify(|count| *count += 1)
|
||||
.or_insert(1);
|
||||
let task = RegionMigrationProcedureTask {
|
||||
region_id,
|
||||
from_peer: from_peer.clone(),
|
||||
to_peer: peer,
|
||||
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
|
||||
};
|
||||
tasks.push((task, count));
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
|
||||
let from_peer_id = task.from_peer.id;
|
||||
let region_id = task.region_id;
|
||||
|
||||
info!(
|
||||
"Failover for region: {region_id}, from_peer: {from_peer}, to_peer: {to_peer}, tries: {count}"
|
||||
"Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
|
||||
task.region_id, task.from_peer, task.to_peer, task.timeout, count
|
||||
);
|
||||
let task = RegionMigrationProcedureTask {
|
||||
region_id,
|
||||
from_peer,
|
||||
to_peer,
|
||||
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
|
||||
};
|
||||
|
||||
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
|
||||
return match err {
|
||||
@@ -439,25 +534,25 @@ impl RegionSupervisor {
|
||||
MigrationRunning { .. } => {
|
||||
info!(
|
||||
"Another region migration is running, skip failover for region: {}, datanode: {}",
|
||||
region_id, datanode_id
|
||||
region_id, from_peer_id
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
TableRouteNotFound { .. } => {
|
||||
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
|
||||
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
|
||||
.await;
|
||||
info!(
|
||||
"Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
|
||||
region_id, datanode_id
|
||||
region_id, from_peer_id
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
LeaderPeerChanged { .. } => {
|
||||
self.deregister_failure_detectors(vec![(datanode_id, region_id)])
|
||||
self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
|
||||
.await;
|
||||
info!(
|
||||
"Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
|
||||
region_id, datanode_id
|
||||
region_id, from_peer_id
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -521,6 +616,7 @@ pub(crate) mod tests {
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use super::RegionSupervisorSelector;
|
||||
use crate::procedure::region_migration::manager::RegionMigrationManager;
|
||||
use crate::procedure::region_migration::test_util::TestingEnv;
|
||||
use crate::region::supervisor::{
|
||||
@@ -548,7 +644,7 @@ pub(crate) mod tests {
|
||||
rx,
|
||||
Default::default(),
|
||||
selector_context,
|
||||
selector,
|
||||
RegionSupervisorSelector::NaiveSelector(selector),
|
||||
region_migration_manager,
|
||||
maintenance_mode_manager,
|
||||
peer_lookup,
|
||||
|
||||
@@ -23,6 +23,7 @@ pub mod weighted_choose;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use store_api::storage::RegionId;
|
||||
use strum::AsRefStr;
|
||||
|
||||
use crate::error;
|
||||
@@ -36,6 +37,24 @@ pub trait Selector: Send + Sync {
|
||||
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Self::Output>;
|
||||
}
|
||||
|
||||
/// A selector that aware of region statistics
|
||||
///
|
||||
/// It selects the best destination peer for a list of regions.
|
||||
/// The selection is based on the region statistics, such as the region leader's write throughput.
|
||||
#[async_trait::async_trait]
|
||||
pub trait RegionStatAwareSelector: Send + Sync {
|
||||
type Context;
|
||||
type Output;
|
||||
|
||||
async fn select(
|
||||
&self,
|
||||
ctx: &Self::Context,
|
||||
from_peer_id: u64,
|
||||
region_ids: &[RegionId],
|
||||
exclude_peer_ids: HashSet<u64>,
|
||||
) -> Result<Self::Output>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SelectorOptions {
|
||||
/// Minimum number of selected results.
|
||||
|
||||
Reference in New Issue
Block a user