mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-26 08:00:01 +00:00
feat: implement automatic region failure detector registrations (#6370)
* feat: implement automatic region failure detector registrations Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: remove unused error Signed-off-by: WenyXu <wenymedia@gmail.com> * test: add more tests Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: add `region_failure_detector_initialization_delay` option Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: update config.md Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: apply suggestions from CR Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: update config.md Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
@@ -325,6 +325,7 @@
|
||||
| `selector` | String | `round_robin` | Datanode selector type.<br/>- `round_robin` (default value)<br/>- `lease_based`<br/>- `load_based`<br/>For details, please see "https://docs.greptime.com/developer-guide/metasrv/selector". |
|
||||
| `use_memory_store` | Bool | `false` | Store data in memory. |
|
||||
| `enable_region_failover` | Bool | `false` | Whether to enable region failover.<br/>This feature is only available on GreptimeDB running on cluster mode and<br/>- Using Remote WAL<br/>- Using shared storage (e.g., s3). |
|
||||
| `region_failure_detector_initialization_delay` | String | `10m` | Delay before initializing region failure detectors.<br/>This delay helps prevent premature initialization of region failure detectors in cases where<br/>cluster maintenance mode is enabled right after metasrv starts, especially when the cluster<br/>is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration<br/>may trigger unnecessary region failovers during datanode startup. |
|
||||
| `allow_region_failover_on_local_wal` | Bool | `false` | Whether to allow region failover on local WAL.<br/>**This option is not recommended to be set to true, because it may lead to data loss during failover.** |
|
||||
| `node_max_idle_time` | String | `24hours` | Max allowed idle time before removing node info from metasrv memory. |
|
||||
| `enable_telemetry` | Bool | `true` | Whether to enable greptimedb telemetry. Enabled by default. |
|
||||
|
||||
@@ -43,6 +43,13 @@ use_memory_store = false
|
||||
## - Using shared storage (e.g., s3).
|
||||
enable_region_failover = false
|
||||
|
||||
## Delay before initializing region failure detectors.
|
||||
## This delay helps prevent premature initialization of region failure detectors in cases where
|
||||
## cluster maintenance mode is enabled right after metasrv starts, especially when the cluster
|
||||
## is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration
|
||||
## may trigger unnecessary region failovers during datanode startup.
|
||||
region_failure_detector_initialization_delay = '10m'
|
||||
|
||||
## Whether to allow region failover on local WAL.
|
||||
## **This option is not recommended to be set to true, because it may lead to data loss during failover.**
|
||||
allow_region_failover_on_local_wal = false
|
||||
|
||||
@@ -48,6 +48,11 @@ impl TableRouteKey {
|
||||
pub fn new(table_id: TableId) -> Self {
|
||||
Self { table_id }
|
||||
}
|
||||
|
||||
/// Returns the range prefix of the table route key.
|
||||
pub fn range_prefix() -> Vec<u8> {
|
||||
format!("{}/", TABLE_ROUTE_PREFIX).into_bytes()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
|
||||
|
||||
@@ -54,14 +54,6 @@ pub enum Error {
|
||||
peer_id: u64,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to lookup peer: {}", peer_id))]
|
||||
LookupPeer {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: common_meta::error::Error,
|
||||
peer_id: u64,
|
||||
},
|
||||
|
||||
#[snafu(display("Another migration procedure is running for region: {}", region_id))]
|
||||
MigrationRunning {
|
||||
#[snafu(implicit)]
|
||||
@@ -1033,7 +1025,6 @@ impl ErrorExt for Error {
|
||||
}
|
||||
|
||||
Error::Other { source, .. } => source.status_code(),
|
||||
Error::LookupPeer { source, .. } => source.status_code(),
|
||||
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
#[cfg(feature = "pg_kvbackend")]
|
||||
|
||||
@@ -110,6 +110,14 @@ pub struct MetasrvOptions {
|
||||
pub use_memory_store: bool,
|
||||
/// Whether to enable region failover.
|
||||
pub enable_region_failover: bool,
|
||||
/// Delay before initializing region failure detectors.
|
||||
///
|
||||
/// This delay helps prevent premature initialization of region failure detectors in cases where
|
||||
/// cluster maintenance mode is enabled right after metasrv starts, especially when the cluster
|
||||
/// is not deployed via the recommended GreptimeDB Operator. Without this delay, early detector registration
|
||||
/// may trigger unnecessary region failovers during datanode startup.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub region_failure_detector_initialization_delay: Duration,
|
||||
/// Whether to allow region failover on local WAL.
|
||||
///
|
||||
/// If it's true, the region failover will be allowed even if the local WAL is used.
|
||||
@@ -219,6 +227,7 @@ impl Default for MetasrvOptions {
|
||||
selector: SelectorType::default(),
|
||||
use_memory_store: false,
|
||||
enable_region_failover: false,
|
||||
region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
|
||||
allow_region_failover_on_local_wal: false,
|
||||
grpc: GrpcOptions {
|
||||
bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
|
||||
|
||||
@@ -64,7 +64,7 @@ use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
|
||||
use crate::procedure::wal_prune::Context as WalPruneContext;
|
||||
use crate::region::supervisor::{
|
||||
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
|
||||
RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
|
||||
RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL,
|
||||
};
|
||||
use crate::selector::lease_based::LeaseBasedSelector;
|
||||
use crate::selector::round_robin::RoundRobinSelector;
|
||||
@@ -299,6 +299,8 @@ impl MetasrvBuilder {
|
||||
Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
|
||||
Some(Arc::new(RegionSupervisorTicker::new(
|
||||
DEFAULT_TICK_INTERVAL,
|
||||
options.region_failure_detector_initialization_delay,
|
||||
DEFAULT_INITIALIZATION_RETRY_PERIOD,
|
||||
tx.clone(),
|
||||
))),
|
||||
)
|
||||
@@ -341,6 +343,7 @@ impl MetasrvBuilder {
|
||||
region_migration_manager.clone(),
|
||||
maintenance_mode_manager.clone(),
|
||||
peer_lookup_service.clone(),
|
||||
leader_cached_kv_backend.clone(),
|
||||
);
|
||||
|
||||
Some(RegionFailureHandler::new(
|
||||
|
||||
@@ -23,7 +23,7 @@ use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::RegionRoute;
|
||||
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
|
||||
use common_telemetry::{error, info};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use table::table_name::TableName;
|
||||
@@ -253,10 +253,12 @@ impl RegionMigrationManager {
|
||||
}
|
||||
|
||||
/// Throws an error if `leader_peer` is not the `from_peer`.
|
||||
///
|
||||
/// If `from_peer` is unknown, use the leader peer as the `from_peer`.
|
||||
fn verify_region_leader_peer(
|
||||
&self,
|
||||
region_route: &RegionRoute,
|
||||
task: &RegionMigrationProcedureTask,
|
||||
task: &mut RegionMigrationProcedureTask,
|
||||
) -> Result<()> {
|
||||
let leader_peer = region_route
|
||||
.leader_peer
|
||||
@@ -275,6 +277,15 @@ impl RegionMigrationManager {
|
||||
}
|
||||
);
|
||||
|
||||
if task.from_peer.addr.is_empty() {
|
||||
warn!(
|
||||
"The `from_peer` is unknown, use the leader peer({}) as the `from_peer`, region: {}",
|
||||
leader_peer, task.region_id
|
||||
);
|
||||
// The peer id is the same as the leader peer id.
|
||||
task.from_peer = leader_peer.clone();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -300,7 +311,7 @@ impl RegionMigrationManager {
|
||||
/// Submits a new region migration procedure.
|
||||
pub async fn submit_procedure(
|
||||
&self,
|
||||
task: RegionMigrationProcedureTask,
|
||||
mut task: RegionMigrationProcedureTask,
|
||||
) -> Result<Option<ProcedureId>> {
|
||||
let Some(guard) = self.insert_running_procedure(&task) else {
|
||||
return error::MigrationRunningSnafu {
|
||||
@@ -333,7 +344,7 @@ impl RegionMigrationManager {
|
||||
.fail();
|
||||
}
|
||||
|
||||
self.verify_region_leader_peer(®ion_route, &task)?;
|
||||
self.verify_region_leader_peer(®ion_route, &mut task)?;
|
||||
self.verify_region_follower_peers(®ion_route, &task)?;
|
||||
let table_info = self.retrieve_table_info(region_id).await?;
|
||||
let TableName {
|
||||
@@ -341,12 +352,6 @@ impl RegionMigrationManager {
|
||||
schema_name,
|
||||
..
|
||||
} = table_info.table_name();
|
||||
METRIC_META_REGION_MIGRATION_DATANODES
|
||||
.with_label_values(&["src", &task.from_peer.id.to_string()])
|
||||
.inc();
|
||||
METRIC_META_REGION_MIGRATION_DATANODES
|
||||
.with_label_values(&["desc", &task.to_peer.id.to_string()])
|
||||
.inc();
|
||||
let RegionMigrationProcedureTask {
|
||||
region_id,
|
||||
from_peer,
|
||||
@@ -377,6 +382,12 @@ impl RegionMigrationManager {
|
||||
return;
|
||||
}
|
||||
};
|
||||
METRIC_META_REGION_MIGRATION_DATANODES
|
||||
.with_label_values(&["src", &task.from_peer.id.to_string()])
|
||||
.inc();
|
||||
METRIC_META_REGION_MIGRATION_DATANODES
|
||||
.with_label_values(&["desc", &task.to_peer.id.to_string()])
|
||||
.inc();
|
||||
|
||||
if let Err(e) = watcher::wait(watcher).await {
|
||||
error!(e; "Failed to wait region migration procedure {procedure_id} for {task}");
|
||||
|
||||
@@ -15,23 +15,30 @@
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fmt::Debug;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_meta::datanode::Stat;
|
||||
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
|
||||
use common_meta::key::maintenance::MaintenanceModeManagerRef;
|
||||
use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
|
||||
use common_meta::key::{MetadataKey, MetadataValue};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_meta::leadership_notifier::LeadershipChangeListener;
|
||||
use common_meta::peer::{Peer, PeerLookupServiceRef};
|
||||
use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
|
||||
use common_meta::rpc::store::RangeRequest;
|
||||
use common_meta::DatanodeId;
|
||||
use common_runtime::JoinHandle;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_time::util::current_time_millis;
|
||||
use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use snafu::{ensure, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::mpsc::{Receiver, Sender};
|
||||
use tokio::time::{interval, MissedTickBehavior};
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::{interval, interval_at, MissedTickBehavior};
|
||||
|
||||
use crate::error::{self, Result};
|
||||
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
|
||||
@@ -70,6 +77,9 @@ impl From<&Stat> for DatanodeHeartbeat {
|
||||
///
|
||||
/// Variants:
|
||||
/// - `Tick`: This event is used to trigger region failure detection periodically.
|
||||
/// - `InitializeAllRegions`: This event is used to initialize all region failure detectors.
|
||||
/// - `RegisterFailureDetectors`: This event is used to register failure detectors for regions.
|
||||
/// - `DeregisterFailureDetectors`: This event is used to deregister failure detectors for regions.
|
||||
/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
|
||||
/// - `Clear`: This event is used to reset the state of the supervisor, typically used
|
||||
/// when a system-wide reset or reinitialization is needed.
|
||||
@@ -78,6 +88,7 @@ impl From<&Stat> for DatanodeHeartbeat {
|
||||
/// of the supervisor during tests.
|
||||
pub(crate) enum Event {
|
||||
Tick,
|
||||
InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
|
||||
RegisterFailureDetectors(Vec<DetectingRegion>),
|
||||
DeregisterFailureDetectors(Vec<DetectingRegion>),
|
||||
HeartbeatArrived(DatanodeHeartbeat),
|
||||
@@ -102,6 +113,7 @@ impl Debug for Event {
|
||||
Self::Tick => write!(f, "Tick"),
|
||||
Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
|
||||
Self::Clear => write!(f, "Clear"),
|
||||
Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
|
||||
Self::RegisterFailureDetectors(arg0) => f
|
||||
.debug_tuple("RegisterFailureDetectors")
|
||||
.field(arg0)
|
||||
@@ -127,6 +139,12 @@ pub struct RegionSupervisorTicker {
|
||||
/// The interval of tick.
|
||||
tick_interval: Duration,
|
||||
|
||||
/// The delay before initializing all region failure detectors.
|
||||
initialization_delay: Duration,
|
||||
|
||||
/// The retry period for initializing all region failure detectors.
|
||||
initialization_retry_period: Duration,
|
||||
|
||||
/// Sends [Event]s.
|
||||
sender: Sender<Event>,
|
||||
}
|
||||
@@ -149,10 +167,21 @@ impl LeadershipChangeListener for RegionSupervisorTicker {
|
||||
}
|
||||
|
||||
impl RegionSupervisorTicker {
|
||||
pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
|
||||
pub(crate) fn new(
|
||||
tick_interval: Duration,
|
||||
initialization_delay: Duration,
|
||||
initialization_retry_period: Duration,
|
||||
sender: Sender<Event>,
|
||||
) -> Self {
|
||||
info!(
|
||||
"RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
|
||||
tick_interval, initialization_delay, initialization_retry_period
|
||||
);
|
||||
Self {
|
||||
tick_handle: Mutex::new(None),
|
||||
tick_interval,
|
||||
initialization_delay,
|
||||
initialization_retry_period,
|
||||
sender,
|
||||
}
|
||||
}
|
||||
@@ -163,15 +192,39 @@ impl RegionSupervisorTicker {
|
||||
if handle.is_none() {
|
||||
let sender = self.sender.clone();
|
||||
let tick_interval = self.tick_interval;
|
||||
let initialization_delay = self.initialization_delay;
|
||||
|
||||
let mut initialization_interval = interval_at(
|
||||
tokio::time::Instant::now() + initialization_delay,
|
||||
self.initialization_retry_period,
|
||||
);
|
||||
initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
common_runtime::spawn_global(async move {
|
||||
loop {
|
||||
initialization_interval.tick().await;
|
||||
let (tx, rx) = oneshot::channel();
|
||||
if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
|
||||
info!("EventReceiver is dropped, region failure detectors initialization loop is stopped");
|
||||
break;
|
||||
}
|
||||
if rx.await.is_ok() {
|
||||
info!("All region failure detectors are initialized.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let sender = self.sender.clone();
|
||||
let ticker_loop = tokio::spawn(async move {
|
||||
let mut interval = interval(tick_interval);
|
||||
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
let mut tick_interval = interval(tick_interval);
|
||||
tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
if let Err(err) = sender.send(Event::Clear).await {
|
||||
warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
|
||||
return;
|
||||
}
|
||||
loop {
|
||||
interval.tick().await;
|
||||
tick_interval.tick().await;
|
||||
if sender.send(Event::Tick).await.is_err() {
|
||||
info!("EventReceiver is dropped, tick loop is stopped");
|
||||
break;
|
||||
@@ -202,6 +255,8 @@ pub type RegionSupervisorRef = Arc<RegionSupervisor>;
|
||||
|
||||
/// The default tick interval.
|
||||
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
|
||||
/// The default initialization retry period.
|
||||
pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Selector for region supervisor.
|
||||
pub enum RegionSupervisorSelector {
|
||||
@@ -228,6 +283,8 @@ pub struct RegionSupervisor {
|
||||
maintenance_mode_manager: MaintenanceModeManagerRef,
|
||||
/// Peer lookup service
|
||||
peer_lookup: PeerLookupServiceRef,
|
||||
/// The kv backend.
|
||||
kv_backend: KvBackendRef,
|
||||
}
|
||||
|
||||
/// Controller for managing failure detectors for regions.
|
||||
@@ -290,6 +347,7 @@ impl RegionSupervisor {
|
||||
tokio::sync::mpsc::channel(1024)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn new(
|
||||
event_receiver: Receiver<Event>,
|
||||
options: PhiAccrualFailureDetectorOptions,
|
||||
@@ -298,6 +356,7 @@ impl RegionSupervisor {
|
||||
region_migration_manager: RegionMigrationManagerRef,
|
||||
maintenance_mode_manager: MaintenanceModeManagerRef,
|
||||
peer_lookup: PeerLookupServiceRef,
|
||||
kv_backend: KvBackendRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
failure_detector: RegionFailureDetector::new(options),
|
||||
@@ -308,6 +367,7 @@ impl RegionSupervisor {
|
||||
region_migration_manager,
|
||||
maintenance_mode_manager,
|
||||
peer_lookup,
|
||||
kv_backend,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,6 +375,26 @@ impl RegionSupervisor {
|
||||
pub(crate) async fn run(&mut self) {
|
||||
while let Some(event) = self.receiver.recv().await {
|
||||
match event {
|
||||
Event::InitializeAllRegions(sender) => {
|
||||
match self.is_maintenance_mode_enabled().await {
|
||||
Ok(false) => {}
|
||||
Ok(true) => {
|
||||
warn!("Skipping initialize all regions since maintenance mode is enabled.");
|
||||
continue;
|
||||
}
|
||||
Err(err) => {
|
||||
error!(err; "Failed to check maintenance mode during initialize all regions.");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if let Err(err) = self.initialize_all().await {
|
||||
error!(err; "Failed to initialize all regions.");
|
||||
} else {
|
||||
// Ignore the error.
|
||||
let _ = sender.send(());
|
||||
}
|
||||
}
|
||||
Event::Tick => {
|
||||
let regions = self.detect_region_failure();
|
||||
self.handle_region_failures(regions).await;
|
||||
@@ -336,6 +416,59 @@ impl RegionSupervisor {
|
||||
info!("RegionSupervisor is stopped!");
|
||||
}
|
||||
|
||||
async fn initialize_all(&self) -> Result<()> {
|
||||
let now = Instant::now();
|
||||
let regions = self.regions();
|
||||
let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
|
||||
let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
|
||||
TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
|
||||
})
|
||||
.into_stream();
|
||||
|
||||
let mut stream = stream
|
||||
.map_ok(|(_, value)| {
|
||||
TableRouteValue::try_from_raw_value(&value)
|
||||
.context(error::TableMetadataManagerSnafu)
|
||||
})
|
||||
.boxed();
|
||||
let mut detecting_regions = Vec::new();
|
||||
while let Some(route) = stream
|
||||
.try_next()
|
||||
.await
|
||||
.context(error::TableMetadataManagerSnafu)?
|
||||
{
|
||||
let route = route?;
|
||||
if !route.is_physical() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let physical_table_route = route.into_physical_table_route();
|
||||
physical_table_route
|
||||
.region_routes
|
||||
.iter()
|
||||
.for_each(|region_route| {
|
||||
if !regions.contains(®ion_route.region.id) {
|
||||
if let Some(leader_peer) = ®ion_route.leader_peer {
|
||||
detecting_regions.push((leader_peer.id, region_route.region.id));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let num_detecting_regions = detecting_regions.len();
|
||||
if !detecting_regions.is_empty() {
|
||||
self.register_failure_detectors(detecting_regions).await;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Initialize {} region failure detectors, elapsed: {:?}",
|
||||
num_detecting_regions,
|
||||
now.elapsed()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
|
||||
let ts_millis = current_time_millis();
|
||||
for region in detecting_regions {
|
||||
@@ -497,12 +630,10 @@ impl RegionSupervisor {
|
||||
.peer_lookup
|
||||
.datanode(from_peer_id)
|
||||
.await
|
||||
.context(error::LookupPeerSnafu {
|
||||
peer_id: from_peer_id,
|
||||
})?
|
||||
.context(error::PeerUnavailableSnafu {
|
||||
peer_id: from_peer_id,
|
||||
})?;
|
||||
.ok()
|
||||
.flatten()
|
||||
.unwrap_or_else(|| Peer::empty(from_peer_id));
|
||||
|
||||
let region_peers = self
|
||||
.select_peers(from_peer_id, regions, failed_datanodes)
|
||||
.await?;
|
||||
@@ -599,6 +730,14 @@ impl RegionSupervisor {
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
/// Returns all regions that registered in the failure detector.
|
||||
fn regions(&self) -> HashSet<RegionId> {
|
||||
self.failure_detector
|
||||
.iter()
|
||||
.map(|e| e.region_ident().1)
|
||||
.collect::<HashSet<_>>()
|
||||
}
|
||||
|
||||
/// Updates the state of corresponding failure detectors.
|
||||
fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
|
||||
for region_id in heartbeat.regions {
|
||||
@@ -618,13 +757,22 @@ impl RegionSupervisor {
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use common_meta::ddl::test_util::{
|
||||
test_create_logical_table_task, test_create_physical_table_task,
|
||||
};
|
||||
use common_meta::ddl::RegionFailureDetectorController;
|
||||
use common_meta::key::maintenance;
|
||||
use common_meta::key::table_route::{
|
||||
LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
|
||||
};
|
||||
use common_meta::key::{maintenance, TableMetadataManager};
|
||||
use common_meta::peer::Peer;
|
||||
use common_meta::rpc::router::{Region, RegionRoute};
|
||||
use common_meta::test_util::NoopPeerLookupService;
|
||||
use common_telemetry::info;
|
||||
use common_time::util::current_time_millis;
|
||||
use rand::Rng;
|
||||
use store_api::storage::RegionId;
|
||||
@@ -654,6 +802,7 @@ pub(crate) mod tests {
|
||||
Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend()));
|
||||
let peer_lookup = Arc::new(NoopPeerLookupService);
|
||||
let (tx, rx) = RegionSupervisor::channel();
|
||||
let kv_backend = env.kv_backend();
|
||||
|
||||
(
|
||||
RegionSupervisor::new(
|
||||
@@ -664,6 +813,7 @@ pub(crate) mod tests {
|
||||
region_migration_manager,
|
||||
maintenance_mode_manager,
|
||||
peer_lookup,
|
||||
kv_backend,
|
||||
),
|
||||
tx,
|
||||
)
|
||||
@@ -748,6 +898,8 @@ pub(crate) mod tests {
|
||||
let ticker = RegionSupervisorTicker {
|
||||
tick_handle: Mutex::new(None),
|
||||
tick_interval: Duration::from_millis(10),
|
||||
initialization_delay: Duration::from_millis(100),
|
||||
initialization_retry_period: Duration::from_millis(100),
|
||||
sender: tx,
|
||||
};
|
||||
// It's ok if we start the ticker again.
|
||||
@@ -757,11 +909,116 @@ pub(crate) mod tests {
|
||||
ticker.stop();
|
||||
assert!(!rx.is_empty());
|
||||
while let Ok(event) = rx.try_recv() {
|
||||
assert_matches!(event, Event::Tick | Event::Clear);
|
||||
assert_matches!(
|
||||
event,
|
||||
Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_initialize_all_regions_event_handling() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
|
||||
let ticker = RegionSupervisorTicker {
|
||||
tick_handle: Mutex::new(None),
|
||||
tick_interval: Duration::from_millis(1000),
|
||||
initialization_delay: Duration::from_millis(50),
|
||||
initialization_retry_period: Duration::from_millis(50),
|
||||
sender: tx,
|
||||
};
|
||||
ticker.start();
|
||||
sleep(Duration::from_millis(60)).await;
|
||||
let handle = tokio::spawn(async move {
|
||||
let mut counter = 0;
|
||||
while let Some(event) = rx.recv().await {
|
||||
if let Event::InitializeAllRegions(tx) = event {
|
||||
if counter == 0 {
|
||||
// Ignore the first event
|
||||
counter += 1;
|
||||
continue;
|
||||
}
|
||||
tx.send(()).unwrap();
|
||||
info!("Responded initialize all regions event");
|
||||
break;
|
||||
}
|
||||
}
|
||||
rx
|
||||
});
|
||||
|
||||
let rx = handle.await.unwrap();
|
||||
for _ in 0..3 {
|
||||
sleep(Duration::from_millis(100)).await;
|
||||
assert!(rx.is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_initialize_all_regions() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (mut supervisor, sender) = new_test_supervisor();
|
||||
let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
|
||||
|
||||
// Create a physical table metadata
|
||||
let table_id = 1024;
|
||||
let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
|
||||
create_physical_table_task.set_table_id(table_id);
|
||||
let table_info = create_physical_table_task.table_info;
|
||||
let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
|
||||
region: Region {
|
||||
id: RegionId::new(table_id, 0),
|
||||
..Default::default()
|
||||
},
|
||||
leader_peer: Some(Peer::empty(1)),
|
||||
..Default::default()
|
||||
}]);
|
||||
let table_route_value = TableRouteValue::Physical(table_route);
|
||||
table_metadata_manager
|
||||
.create_table_metadata(table_info, table_route_value, HashMap::new())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create a logical table metadata
|
||||
let logical_table_id = 1025;
|
||||
let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
|
||||
test_create_logical_table_task.set_table_id(logical_table_id);
|
||||
let table_info = test_create_logical_table_task.table_info;
|
||||
let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
|
||||
let table_route_value = TableRouteValue::Logical(table_route);
|
||||
table_metadata_manager
|
||||
.create_table_metadata(table_info, table_route_value, HashMap::new())
|
||||
.await
|
||||
.unwrap();
|
||||
tokio::spawn(async move { supervisor.run().await });
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
|
||||
assert!(rx.await.is_ok());
|
||||
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender.send(Event::Dump(tx)).await.unwrap();
|
||||
let detector = rx.await.unwrap();
|
||||
assert_eq!(detector.len(), 1);
|
||||
assert!(detector.contains(&(1, RegionId::new(1024, 0))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_initialize_all_regions_with_maintenance_mode() {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let (mut supervisor, sender) = new_test_supervisor();
|
||||
|
||||
supervisor
|
||||
.maintenance_mode_manager
|
||||
.set_maintenance_mode()
|
||||
.await
|
||||
.unwrap();
|
||||
tokio::spawn(async move { supervisor.run().await });
|
||||
let (tx, rx) = oneshot::channel();
|
||||
sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
|
||||
// The sender is dropped, so the receiver will receive an error.
|
||||
assert!(rx.await.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_region_failure_detector_controller() {
|
||||
let (mut supervisor, sender) = new_test_supervisor();
|
||||
|
||||
Reference in New Issue
Block a user