refactor: migrate region failover implementation to region migration (#4172)

* refactor: migrate region failover implementation to region migration

* fix: use HEARTBEAT_INTERVAL_MILLIS as lease secs

* fix: return false if leader is downgraded

* fix: only remove failure detector after submitting procedure successfully

* feat: ignore dropped region

* refactor: retrieve table routes in batches

* refactor: disable region failover on local WAL implementation

* fix: move the guard into procedure

* feat: use real peer addr

* feat: use interval instead of sleep

* chore: rename `HeartbeatSender` to `HeartbeatAcceptor`

* chore: apply suggestions from CR

* chore: reduce duplicate code

* chore: apply suggestions from CR

* feat: lookup peer addr

* chore: add comments

* chore: apply suggestions from CR

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-06-25 19:58:17 +08:00
committed by GitHub
parent f5ac158605
commit 8cbe7166b0
19 changed files with 963 additions and 628 deletions

View File

@@ -135,7 +135,7 @@ use crate::rpc::store::BatchDeleteRequest;
use crate::DatanodeId;
pub const NAME_PATTERN: &str = r"[a-zA-Z_:-][a-zA-Z0-9_:\-\.]*";
pub const MAINTENANCE_KEY: &str = "maintenance";
pub const MAINTENANCE_KEY: &str = "__maintenance";
const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table";
pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info";

View File

@@ -33,10 +33,11 @@ use crate::kv_backend::KvBackendRef;
use crate::node_manager::{
Datanode, DatanodeRef, Flownode, FlownodeRef, NodeManager, NodeManagerRef,
};
use crate::peer::{Peer, StandalonePeerLookupService};
use crate::peer::{Peer, PeerLookupService, StandalonePeerLookupService};
use crate::region_keeper::MemoryRegionKeeper;
use crate::sequence::SequenceBuilder;
use crate::wal_options_allocator::WalOptionsAllocator;
use crate::{ClusterId, DatanodeId, FlownodeId};
#[async_trait::async_trait]
pub trait MockDatanodeHandler: Sync + Send + Clone {
@@ -183,3 +184,16 @@ pub fn new_ddl_context_with_kv_backend(
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
}
}
pub struct NoopPeerLookupService;
#[async_trait::async_trait]
impl PeerLookupService for NoopPeerLookupService {
async fn datanode(&self, _cluster_id: ClusterId, id: DatanodeId) -> Result<Option<Peer>> {
Ok(Some(Peer::empty(id)))
}
async fn flownode(&self, _cluster_id: ClusterId, id: FlownodeId) -> Result<Option<Peer>> {
Ok(Some(Peer::empty(id)))
}
}

View File

@@ -87,6 +87,11 @@ impl WalOptionsAllocator {
}
}
}
/// Returns true if it's the remote WAL.
pub fn is_remote_wal(&self) -> bool {
matches!(&self, WalOptionsAllocator::Kafka(_))
}
}
/// Allocates a wal options for each region. The allocated wal options is encoded immediately.

View File

@@ -39,6 +39,14 @@ pub enum Error {
peer_id: u64,
},
#[snafu(display("Failed to lookup peer: {}", peer_id))]
LookupPeer {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
peer_id: u64,
},
#[snafu(display("Another migration procedure is running for region: {}", region_id))]
MigrationRunning {
#[snafu(implicit)]
@@ -972,6 +980,7 @@ impl ErrorExt for Error {
}
Error::Other { source, .. } => source.status_code(),
Error::LookupPeer { source, .. } => source.status_code(),
}
}

View File

@@ -63,7 +63,7 @@ pub(crate) struct PhiAccrualFailureDetector {
last_heartbeat_millis: Option<i64>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct PhiAccrualFailureDetectorOptions {
pub threshold: f32,
@@ -195,7 +195,7 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
/// It is capped by the number of samples specified in `max_sample_size`.
///
/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
#[derive(Clone)]
#[derive(Debug, Clone)]
struct HeartbeatHistory {
/// Number of samples to use for calculation of mean and standard deviation of inter-arrival
/// times.

View File

@@ -12,48 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod runner;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Role};
use async_trait::async_trait;
use common_meta::RegionIdent;
use common_telemetry::info;
use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner};
use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::{Context, ElectionRef};
use crate::procedure::region_failover::RegionFailoverManager;
pub(crate) struct DatanodeHeartbeat {
region_idents: Vec<RegionIdent>,
heartbeat_time: i64,
}
use crate::metasrv::Context;
use crate::region::supervisor::{DatanodeHeartbeat, HeartbeatAcceptor, RegionSupervisor};
pub struct RegionFailureHandler {
failure_detect_runner: FailureDetectRunner,
heartbeat_acceptor: HeartbeatAcceptor,
}
impl RegionFailureHandler {
pub(crate) async fn try_new(
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
) -> Result<Self> {
region_failover_manager.try_start()?;
let mut failure_detect_runner = FailureDetectRunner::new(
election,
region_failover_manager.clone(),
failure_detector_options,
);
failure_detect_runner.start().await;
Ok(Self {
failure_detect_runner,
})
pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self {
let heartbeat_acceptor = region_supervisor.heartbeat_acceptor();
info!("Starting region supervisor");
common_runtime::spawn_bg(async move { region_supervisor.run().await });
Self { heartbeat_acceptor }
}
}
@@ -66,38 +43,16 @@ impl HeartbeatHandler for RegionFailureHandler {
async fn handle(
&self,
_: &HeartbeatRequest,
ctx: &mut Context,
_ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
if ctx.is_infancy {
self.failure_detect_runner
.send_control(FailureDetectControl::Purge)
.await;
}
let Some(stat) = acc.stat.as_ref() else {
return Ok(HandleControl::Continue);
};
let heartbeat = DatanodeHeartbeat {
region_idents: stat
.region_stats
.iter()
.map(|x| {
let region_id = x.id;
RegionIdent {
cluster_id: stat.cluster_id,
datanode_id: stat.id,
table_id: region_id.table_id(),
region_number: region_id.region_number(),
engine: x.engine.clone(),
}
})
.collect(),
heartbeat_time: stat.timestamp_millis,
};
self.failure_detect_runner.send_heartbeat(heartbeat).await;
self.heartbeat_acceptor
.accept(DatanodeHeartbeat::from(stat))
.await;
Ok(HandleControl::Continue)
}
@@ -105,34 +60,28 @@ impl HeartbeatHandler for RegionFailureHandler {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use api::v1::meta::HeartbeatRequest;
use common_catalog::consts::default_engine;
use common_meta::key::MAINTENANCE_KEY;
use store_api::region_engine::RegionRole;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use super::*;
use crate::handler::failure_handler::RegionFailureHandler;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::builder::MetasrvBuilder;
use crate::test_util::create_region_failover_manager;
use crate::region::supervisor::tests::new_test_supervisor;
use crate::region::supervisor::Event;
#[tokio::test(flavor = "multi_thread")]
#[tokio::test]
async fn test_handle_heartbeat() {
let region_failover_manager = create_region_failover_manager();
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let handler =
RegionFailureHandler::try_new(None, region_failover_manager, failure_detector_options)
.await
.unwrap();
let supervisor = new_test_supervisor();
let sender = supervisor.sender();
let handler = RegionFailureHandler::new(supervisor);
let req = &HeartbeatRequest::default();
let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;
let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
@@ -153,48 +102,9 @@ mod tests {
});
handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);
// infancy makes heartbeats re-accumulated
ctx.is_infancy = true;
acc.stat = None;
handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_maintenance_mode() {
let region_failover_manager = create_region_failover_manager();
let kv_backend = region_failover_manager.create_context().kv_backend.clone();
let _handler = RegionFailureHandler::try_new(
None,
region_failover_manager.clone(),
PhiAccrualFailureDetectorOptions::default(),
)
.await
.unwrap();
let kv_req = common_meta::rpc::store::PutRequest {
key: Vec::from(MAINTENANCE_KEY),
value: vec![],
prev_kv: false,
};
let _ = kv_backend.put(kv_req.clone()).await.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(true)
);
let _ = kv_backend
.delete(MAINTENANCE_KEY.as_bytes(), false)
.await
.unwrap();
assert_matches!(
region_failover_manager.is_maintenance_mode().await,
Ok(false)
);
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
assert_eq!(detector.iter().collect::<Vec<_>>().len(), 3);
}
}

View File

@@ -1,411 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::{Duration, Instant};
use common_meta::RegionIdent;
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use dashmap::mapref::multiple::RefMulti;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinHandle;
use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
use crate::handler::failure_handler::DatanodeHeartbeat;
use crate::metasrv::ElectionRef;
use crate::procedure::region_failover::RegionFailoverManager;
pub(crate) enum FailureDetectControl {
Purge,
#[cfg(test)]
Dump(tokio::sync::oneshot::Sender<FailureDetectorContainer>),
}
pub(crate) struct FailureDetectRunner {
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
heartbeat_tx: Sender<DatanodeHeartbeat>,
heartbeat_rx: Option<Receiver<DatanodeHeartbeat>>,
control_tx: Sender<FailureDetectControl>,
control_rx: Option<Receiver<FailureDetectControl>>,
receiver_handle: Option<JoinHandle<()>>,
runner_handle: Option<JoinHandle<()>>,
}
impl FailureDetectRunner {
pub(super) fn new(
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
) -> Self {
let (heartbeat_tx, heartbeat_rx) = mpsc::channel::<DatanodeHeartbeat>(1024);
let (control_tx, control_rx) = mpsc::channel::<FailureDetectControl>(1024);
Self {
election,
region_failover_manager,
failure_detector_options,
heartbeat_tx,
heartbeat_rx: Some(heartbeat_rx),
control_tx,
control_rx: Some(control_rx),
receiver_handle: None,
runner_handle: None,
}
}
pub(crate) async fn send_heartbeat(&self, heartbeat: DatanodeHeartbeat) {
if let Err(e) = self.heartbeat_tx.send(heartbeat).await {
error!(e; "FailureDetectRunner is stop receiving heartbeats")
}
}
pub(crate) async fn send_control(&self, control: FailureDetectControl) {
if let Err(e) = self.control_tx.send(control).await {
error!(e; "FailureDetectRunner is stop receiving controls")
}
}
pub(crate) async fn start(&mut self) {
let failure_detectors = Arc::new(FailureDetectorContainer {
detectors: DashMap::new(),
options: self.failure_detector_options.clone(),
});
self.start_with(failure_detectors).await
}
async fn start_with(&mut self, failure_detectors: Arc<FailureDetectorContainer>) {
let Some(mut heartbeat_rx) = self.heartbeat_rx.take() else {
return;
};
let Some(mut control_rx) = self.control_rx.take() else {
return;
};
let container = failure_detectors.clone();
let receiver_handle = common_runtime::spawn_bg(async move {
loop {
tokio::select! {
Some(control) = control_rx.recv() => {
match control {
FailureDetectControl::Purge => container.clear(),
#[cfg(test)]
FailureDetectControl::Dump(tx) => {
// Drain any heartbeats that are not handled before dump.
while let Ok(heartbeat) = heartbeat_rx.try_recv() {
for ident in heartbeat.region_idents {
let mut detector = container.get_failure_detector(ident);
detector.heartbeat(heartbeat.heartbeat_time);
}
}
let _ = tx.send(container.dump());
}
}
}
Some(heartbeat) = heartbeat_rx.recv() => {
for ident in heartbeat.region_idents {
let mut detector = container.get_failure_detector(ident);
detector.heartbeat(heartbeat.heartbeat_time);
}
}
else => {
warn!("Both control and heartbeat senders are closed, quit receiving.");
break;
}
}
}
});
self.receiver_handle = Some(receiver_handle);
let election = self.election.clone();
let region_failover_manager = self.region_failover_manager.clone();
let runner_handle = common_runtime::spawn_bg(async move {
async fn maybe_region_failover(
failure_detectors: &Arc<FailureDetectorContainer>,
region_failover_manager: &Arc<RegionFailoverManager>,
) {
match region_failover_manager.is_maintenance_mode().await {
Ok(false) => {}
Ok(true) => {
info!("Maintenance mode is enabled, skip failover");
return;
}
Err(err) => {
error!(err; "Failed to check maintenance mode");
return;
}
}
let failed_regions = failure_detectors
.iter()
.filter_map(|e| {
// Intentionally not place `current_time_millis()` out of the iteration.
// The failure detection determination should be happened "just in time",
// i.e., failed or not has to be compared with the most recent "now".
// Besides, it might reduce the false positive of failure detection,
// because during the iteration, heartbeats are coming in as usual,
// and the `phi`s are still updating.
if !e.failure_detector().is_available(current_time_millis()) {
Some(e.region_ident().clone())
} else {
None
}
})
.collect::<Vec<RegionIdent>>();
for r in failed_regions {
if let Err(e) = region_failover_manager.do_region_failover(&r).await {
error!(e; "Failed to do region failover for {r}");
} else {
// Now that we know the region is starting to do failover, remove it
// from the failure detectors, avoiding the failover procedure to be
// triggered again.
// If the region is back alive (the failover procedure runs successfully),
// it will be added back to the failure detectors again.
failure_detectors.remove(&r);
}
}
}
loop {
let start = Instant::now();
let is_leader = election.as_ref().map(|x| x.is_leader()).unwrap_or(true);
if is_leader {
maybe_region_failover(&failure_detectors, &region_failover_manager).await;
}
let elapsed = Instant::now().duration_since(start);
if let Some(sleep) = Duration::from_secs(1).checked_sub(elapsed) {
tokio::time::sleep(sleep).await;
} // else the elapsed time is exceeding one second, we should continue working immediately
}
});
self.runner_handle = Some(runner_handle);
}
#[cfg(test)]
pub(crate) async fn dump(&self) -> FailureDetectorContainer {
let (tx, rx) = tokio::sync::oneshot::channel();
self.send_control(FailureDetectControl::Dump(tx)).await;
rx.await.unwrap()
}
}
impl Drop for FailureDetectRunner {
fn drop(&mut self) {
if let Some(handle) = self.receiver_handle.take() {
handle.abort();
info!("Heartbeat receiver in FailureDetectRunner is stopped.");
}
if let Some(handle) = self.runner_handle.take() {
handle.abort();
info!("Failure detector in FailureDetectRunner is stopped.");
}
}
}
pub(crate) struct FailureDetectorEntry<'a> {
e: RefMulti<'a, RegionIdent, PhiAccrualFailureDetector>,
}
impl FailureDetectorEntry<'_> {
fn region_ident(&self) -> &RegionIdent {
self.e.key()
}
fn failure_detector(&self) -> &PhiAccrualFailureDetector {
self.e.value()
}
}
pub(crate) struct FailureDetectorContainer {
options: PhiAccrualFailureDetectorOptions,
detectors: DashMap<RegionIdent, PhiAccrualFailureDetector>,
}
impl FailureDetectorContainer {
fn get_failure_detector(
&self,
ident: RegionIdent,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.detectors
.entry(ident)
.or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options.clone()))
}
pub(crate) fn iter(&self) -> Box<dyn Iterator<Item = FailureDetectorEntry> + '_> {
Box::new(
self.detectors
.iter()
.map(move |e| FailureDetectorEntry { e }),
) as _
}
fn remove(&self, ident: &RegionIdent) {
let _ = self.detectors.remove(ident);
}
fn clear(&self) {
self.detectors.clear()
}
#[cfg(test)]
fn dump(&self) -> FailureDetectorContainer {
let mut m = DashMap::with_capacity(self.detectors.len());
m.extend(
self.detectors
.iter()
.map(|x| (x.key().clone(), x.value().clone())),
);
Self {
detectors: m,
options: self.options.clone(),
}
}
}
#[cfg(test)]
mod tests {
use rand::Rng;
use super::*;
use crate::test_util::create_region_failover_manager;
#[test]
fn test_default_failure_detector_container() {
let container = FailureDetectorContainer {
detectors: DashMap::new(),
options: PhiAccrualFailureDetectorOptions::default(),
};
let ident = RegionIdent {
table_id: 1,
cluster_id: 3,
datanode_id: 2,
region_number: 1,
engine: "mito2".to_string(),
};
let _ = container.get_failure_detector(ident.clone());
assert!(container.detectors.contains_key(&ident));
{
let mut iter = container.iter();
let _ = iter.next().unwrap();
assert!(iter.next().is_none());
}
container.clear();
assert!(container.detectors.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_control() {
let container = FailureDetectorContainer {
detectors: DashMap::new(),
options: PhiAccrualFailureDetectorOptions::default(),
};
let ident = RegionIdent {
table_id: 1,
cluster_id: 3,
datanode_id: 2,
region_number: 1,
engine: "mito2".to_string(),
};
let _ = container.get_failure_detector(ident.clone());
let region_failover_manager = create_region_failover_manager();
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let mut runner =
FailureDetectRunner::new(None, region_failover_manager, failure_detector_options);
runner.start_with(Arc::new(container)).await;
let dump = runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 1);
runner.send_control(FailureDetectControl::Purge).await;
let dump = runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_heartbeat() {
let region_failover_manager = create_region_failover_manager();
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let mut runner =
FailureDetectRunner::new(None, region_failover_manager, failure_detector_options);
runner.start().await;
// Generate 2000 heartbeats start from now. Heartbeat interval is one second, plus some random millis.
fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
let mut rng = rand::thread_rng();
let start = current_time_millis();
(0..2000)
.map(|i| DatanodeHeartbeat {
region_idents: region_ids
.iter()
.map(|&region_number| RegionIdent {
table_id: 0,
cluster_id: 1,
datanode_id,
region_number,
engine: "mito2".to_string(),
})
.collect(),
heartbeat_time: start + i * 1000 + rng.gen_range(0..100),
})
.collect::<Vec<_>>()
}
let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
let last_heartbeat_time = heartbeats.last().unwrap().heartbeat_time;
for heartbeat in heartbeats {
runner.send_heartbeat(heartbeat).await;
}
let dump = runner.dump().await;
let failure_detectors = dump.iter().collect::<Vec<_>>();
assert_eq!(failure_detectors.len(), 3);
failure_detectors.iter().for_each(|e| {
let fd = e.failure_detector();
let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
let start = last_heartbeat_time;
// Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
let now = start + i * 1000;
assert_eq!(fd.phi(now), 0.0);
}
// ... then in less than two seconds, phi is above the threshold.
// The same effect can be seen in the diagrams in Akka's document.
let now = start + acceptable_heartbeat_pause_millis + 1000;
assert!(fd.phi(now) < fd.threshold() as _);
let now = start + acceptable_heartbeat_pause_millis + 2000;
assert!(fd.phi(now) > fd.threshold() as _);
});
}
}

View File

@@ -54,6 +54,7 @@ use crate::lease::lookup_datanode_peer;
use crate::lock::DistLockRef;
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
use crate::region::supervisor::RegionSupervisorTickerRef;
use crate::selector::{Selector, SelectorType};
use crate::service::mailbox::MailboxRef;
use crate::service::store::cached_kv::LeaderCachedKvBackend;
@@ -266,6 +267,7 @@ pub struct MetaStateHandler {
subscribe_manager: Option<SubscriptionManagerRef>,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
state: StateRef,
}
@@ -279,6 +281,10 @@ impl MetaStateHandler {
self.state.write().unwrap().next_state(become_leader(true));
}
if let Some(ticker) = self.region_supervisor_ticker.as_ref() {
ticker.start();
}
if let Err(e) = self.procedure_manager.start().await {
error!(e; "Failed to start procedure manager");
}
@@ -297,6 +303,12 @@ impl MetaStateHandler {
if let Err(e) = self.procedure_manager.stop().await {
error!(e; "Failed to stop procedure manager");
}
if let Some(ticker) = self.region_supervisor_ticker.as_ref() {
// Stops the supervisor ticker.
ticker.stop();
}
// Suspends reporting.
self.greptimedb_telemetry_task.should_report(false);
@@ -336,6 +348,7 @@ pub struct Metasrv {
memory_region_keeper: MemoryRegionKeeperRef,
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
plugins: Plugins,
}
@@ -367,6 +380,7 @@ impl Metasrv {
greptimedb_telemetry_task
.start()
.context(StartTelemetryTaskSnafu)?;
let region_supervisor_ticker = self.region_supervisor_ticker.clone();
let state_handler = MetaStateHandler {
greptimedb_telemetry_task,
subscribe_manager,
@@ -374,6 +388,7 @@ impl Metasrv {
wal_options_allocator: self.wal_options_allocator.clone(),
state: self.state.clone(),
leader_cached_kv_backend: leader_cached_kv_backend.clone(),
region_supervisor_ticker,
};
let _handle = common_runtime::spawn_bg(async move {
loop {

View File

@@ -65,10 +65,10 @@ use crate::lock::DistLockRef;
use crate::metasrv::{
ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ,
};
use crate::procedure::region_failover::RegionFailoverManager;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::pubsub::PublisherRef;
use crate::region::supervisor::{RegionSupervisor, DEFAULT_TICK_INTERVAL};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
use crate::service::mailbox::MailboxRef;
@@ -225,6 +225,7 @@ impl MetasrvBuilder {
options.wal.clone(),
kv_backend.clone(),
));
let is_remote_wal = wal_options_allocator.is_remote_wal();
let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
let sequence = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
@@ -280,6 +281,7 @@ impl MetasrvBuilder {
server_addr: options.server_addr.clone(),
},
));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
let ddl_manager = Arc::new(
DdlManager::try_new(
DdlContext {
@@ -290,9 +292,7 @@ impl MetasrvBuilder {
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
peer_lookup_service: Arc::new(MetaPeerLookupService::new(
meta_peer_client.clone(),
)),
peer_lookup_service: peer_lookup_service.clone(),
},
procedure_manager.clone(),
true,
@@ -311,32 +311,36 @@ impl MetasrvBuilder {
));
region_migration_manager.try_start()?;
if !is_remote_wal && options.enable_region_failover {
return error::UnexpectedSnafu {
violated: "Region failover is not supported in the local WAL implementation!",
}
.fail();
}
let (region_failover_handler, region_supervisor_ticker) =
if options.enable_region_failover && is_remote_wal {
let region_supervisor = RegionSupervisor::new(
options.failure_detector,
DEFAULT_TICK_INTERVAL,
selector_ctx.clone(),
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
peer_lookup_service,
);
let region_supervisor_ticker = region_supervisor.ticker();
(
Some(RegionFailureHandler::new(region_supervisor)),
Some(region_supervisor_ticker),
)
} else {
(None, None)
};
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
let region_failover_handler = if options.enable_region_failover {
let region_failover_manager = Arc::new(RegionFailoverManager::new(
distributed_time_constants::REGION_LEASE_SECS,
in_memory.clone(),
kv_backend.clone(),
mailbox.clone(),
procedure_manager.clone(),
(selector.clone(), selector_ctx.clone()),
lock.clone(),
table_metadata_manager.clone(),
));
Some(
RegionFailureHandler::try_new(
election.clone(),
region_failover_manager,
options.failure_detector.clone(),
)
.await?,
)
} else {
None
};
let publish_heartbeat_handler = plugins
.clone()
.and_then(|plugins| plugins.get::<PublisherRef>())
@@ -406,6 +410,7 @@ impl MetasrvBuilder {
plugins: plugins.unwrap_or_else(Plugins::default),
memory_region_keeper,
region_migration_manager,
region_supervisor_ticker,
})
}
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(weny): remove it.
#[allow(unused)]
pub mod region_failover;
pub mod region_migration;
#[cfg(test)]

View File

@@ -19,7 +19,7 @@ pub(crate) mod migration_end;
pub(crate) mod migration_start;
pub(crate) mod open_candidate_region;
#[cfg(test)]
pub(crate) mod test_util;
pub mod test_util;
pub(crate) mod update_metadata;
pub(crate) mod upgrade_candidate_region;
@@ -43,6 +43,7 @@ use common_procedure::error::{
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status, StringKey};
pub use manager::RegionMigrationProcedureTask;
use manager::{RegionMigrationProcedureGuard, RegionMigrationProcedureTracker};
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
@@ -364,43 +365,61 @@ pub struct RegionMigrationData<'a> {
state: &'a dyn State,
}
pub struct RegionMigrationProcedure {
pub(crate) struct RegionMigrationProcedure {
state: Box<dyn State>,
context: Context,
_guard: Option<RegionMigrationProcedureGuard>,
}
#[allow(dead_code)]
impl RegionMigrationProcedure {
const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
pub fn new(
persistent_context: PersistentContext,
context_factory: impl ContextFactory,
guard: Option<RegionMigrationProcedureGuard>,
) -> Self {
let state = Box::new(RegionMigrationStart {});
Self::new_inner(state, persistent_context, context_factory)
Self::new_inner(state, persistent_context, context_factory, guard)
}
fn new_inner(
state: Box<dyn State>,
persistent_context: PersistentContext,
context_factory: impl ContextFactory,
guard: Option<RegionMigrationProcedureGuard>,
) -> Self {
Self {
state,
context: context_factory.new_context(persistent_context),
_guard: guard,
}
}
fn from_json(json: &str, context_factory: impl ContextFactory) -> ProcedureResult<Self> {
fn from_json(
json: &str,
context_factory: impl ContextFactory,
tracker: RegionMigrationProcedureTracker,
) -> ProcedureResult<Self> {
let RegionMigrationDataOwned {
persistent_ctx,
state,
} = serde_json::from_str(json).context(FromJsonSnafu)?;
let guard = tracker.insert_running_procedure(&RegionMigrationProcedureTask {
cluster_id: persistent_ctx.cluster_id,
region_id: persistent_ctx.region_id,
from_peer: persistent_ctx.from_peer.clone(),
to_peer: persistent_ctx.to_peer.clone(),
replay_timeout: persistent_ctx.replay_timeout,
});
let context = context_factory.new_context(persistent_ctx);
Ok(Self { state, context })
Ok(Self {
state,
context,
_guard: guard,
})
}
}
@@ -467,7 +486,7 @@ mod tests {
let env = TestingEnv::new();
let context = env.context_factory();
let procedure = RegionMigrationProcedure::new(persistent_context, context);
let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
let key = procedure.lock_key();
let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
@@ -484,7 +503,7 @@ mod tests {
let env = TestingEnv::new();
let context = env.context_factory();
let procedure = RegionMigrationProcedure::new(persistent_context, context);
let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
let serialized = procedure.dump().unwrap();
let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","cluster_id":0,"from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"replay_timeout":"1s"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
@@ -531,7 +550,7 @@ mod tests {
let persistent_context = new_persistent_context();
let context_factory = env.context_factory();
let state = Box::<MockState>::default();
RegionMigrationProcedure::new_inner(state, persistent_context, context_factory)
RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, None)
}
let ctx = TestingEnv::procedure_context();
@@ -550,8 +569,11 @@ mod tests {
let serialized = procedure.dump().unwrap();
let context_factory = env.context_factory();
let tracker = env.tracker();
let mut procedure =
RegionMigrationProcedure::from_json(&serialized, context_factory).unwrap();
RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
.unwrap();
assert!(tracker.contains(procedure.context.persistent_ctx.region_id));
for _ in 1..3 {
status = Some(procedure.execute(&ctx).await.unwrap());

View File

@@ -39,8 +39,41 @@ pub type RegionMigrationManagerRef = Arc<RegionMigrationManager>;
/// Manager of region migration procedure.
pub struct RegionMigrationManager {
procedure_manager: ProcedureManagerRef,
running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
context_factory: DefaultContextFactory,
tracker: RegionMigrationProcedureTracker,
}
#[derive(Default, Clone)]
pub(crate) struct RegionMigrationProcedureTracker {
running_procedures: Arc<RwLock<HashMap<RegionId, RegionMigrationProcedureTask>>>,
}
impl RegionMigrationProcedureTracker {
/// Returns the [RegionMigrationProcedureGuard] if current region isn't migrating.
pub(crate) fn insert_running_procedure(
&self,
task: &RegionMigrationProcedureTask,
) -> Option<RegionMigrationProcedureGuard> {
let mut procedures = self.running_procedures.write().unwrap();
match procedures.entry(task.region_id) {
Entry::Occupied(_) => None,
Entry::Vacant(v) => {
v.insert(task.clone());
Some(RegionMigrationProcedureGuard {
region_id: task.region_id,
running_procedures: self.running_procedures.clone(),
})
}
}
}
/// Returns true if it contains the specific region(`region_id`).
pub(crate) fn contains(&self, region_id: RegionId) -> bool {
self.running_procedures
.read()
.unwrap()
.contains_key(&region_id)
}
}
/// The guard of running [RegionMigrationProcedureTask].
@@ -51,10 +84,17 @@ pub(crate) struct RegionMigrationProcedureGuard {
impl Drop for RegionMigrationProcedureGuard {
fn drop(&mut self) {
self.running_procedures
.write()
let exists = self
.running_procedures
.read()
.unwrap()
.remove(&self.region_id);
.contains_key(&self.region_id);
if exists {
self.running_procedures
.write()
.unwrap()
.remove(&self.region_id);
}
}
}
@@ -96,27 +136,34 @@ impl Display for RegionMigrationProcedureTask {
}
impl RegionMigrationManager {
/// Returns new [RegionMigrationManager]
/// Returns new [`RegionMigrationManager`]
pub(crate) fn new(
procedure_manager: ProcedureManagerRef,
context_factory: DefaultContextFactory,
) -> Self {
Self {
procedure_manager,
running_procedures: Arc::new(RwLock::new(HashMap::new())),
context_factory,
tracker: RegionMigrationProcedureTracker::default(),
}
}
/// Returns the [`RegionMigrationProcedureTracker`].
pub(crate) fn tracker(&self) -> &RegionMigrationProcedureTracker {
&self.tracker
}
/// Registers the loader of [RegionMigrationProcedure] to the `ProcedureManager`.
pub(crate) fn try_start(&self) -> Result<()> {
let context_factory = self.context_factory.clone();
let tracker = self.tracker.clone();
self.procedure_manager
.register_loader(
RegionMigrationProcedure::TYPE_NAME,
Box::new(move |json| {
let context_factory = context_factory.clone();
RegionMigrationProcedure::from_json(json, context_factory)
let tracker = tracker.clone();
RegionMigrationProcedure::from_json(json, context_factory, tracker)
.map(|p| Box::new(p) as _)
}),
)
@@ -129,18 +176,7 @@ impl RegionMigrationManager {
&self,
task: &RegionMigrationProcedureTask,
) -> Option<RegionMigrationProcedureGuard> {
let mut procedures = self.running_procedures.write().unwrap();
match procedures.entry(task.region_id) {
Entry::Occupied(_) => None,
Entry::Vacant(v) => {
v.insert(task.clone());
Some(RegionMigrationProcedureGuard {
region_id: task.region_id,
running_procedures: self.running_procedures.clone(),
})
}
}
self.tracker.insert_running_procedure(task)
}
fn verify_task(&self, task: &RegionMigrationProcedureTask) -> Result<()> {
@@ -210,6 +246,10 @@ impl RegionMigrationManager {
region_route: &RegionRoute,
task: &RegionMigrationProcedureTask,
) -> Result<bool> {
if region_route.is_leader_downgraded() {
return Ok(false);
}
let leader_peer = region_route
.leader_peer
.as_ref()
@@ -301,15 +341,13 @@ impl RegionMigrationManager {
replay_timeout,
},
self.context_factory.clone(),
Some(guard),
);
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let procedure_id = procedure_with_id.id;
info!("Starting region migration procedure {procedure_id} for {task}");
let procedure_manager = self.procedure_manager.clone();
common_runtime::spawn_bg(async move {
let _ = guard;
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
Ok(watcher) => watcher,
Err(e) => {
@@ -356,6 +394,7 @@ mod test {
};
// Inserts one
manager
.tracker
.running_procedures
.write()
.unwrap()

View File

@@ -26,6 +26,7 @@ use common_meta::instruction::{
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef};
use common_meta::rpc::router::RegionRoute;
@@ -42,6 +43,7 @@ use store_api::storage::RegionId;
use table::metadata::RawTableInfo;
use tokio::sync::mpsc::{Receiver, Sender};
use super::manager::RegionMigrationProcedureTracker;
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
@@ -94,6 +96,14 @@ pub struct TestingEnv {
opening_region_keeper: MemoryRegionKeeperRef,
server_addr: String,
procedure_manager: ProcedureManagerRef,
tracker: RegionMigrationProcedureTracker,
kv_backend: KvBackendRef,
}
impl Default for TestingEnv {
fn default() -> Self {
Self::new()
}
}
impl TestingEnv {
@@ -117,9 +127,21 @@ impl TestingEnv {
mailbox_ctx,
server_addr: "localhost".to_string(),
procedure_manager,
tracker: Default::default(),
kv_backend,
}
}
/// Returns the [KvBackendRef].
pub fn kv_backend(&self) -> KvBackendRef {
self.kv_backend.clone()
}
/// Returns the [RegionMigrationProcedureTracker].
pub(crate) fn tracker(&self) -> RegionMigrationProcedureTracker {
self.tracker.clone()
}
/// Returns a context of region migration procedure.
pub fn context_factory(&self) -> DefaultContextFactory {
DefaultContextFactory {
@@ -431,7 +453,7 @@ impl ProcedureMigrationTestSuite {
/// The step of test.
#[derive(Clone)]
pub enum Step {
pub(crate) enum Step {
Setup((String, BeforeTest)),
Next((String, Option<BeforeTest>, Assertion)),
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod failure_detector;
pub mod lease_keeper;
pub mod supervisor;
pub use lease_keeper::RegionLeaseKeeper;

View File

@@ -0,0 +1,131 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::DerefMut;
use common_meta::{ClusterId, DatanodeId};
use dashmap::mapref::multiple::RefMulti;
use dashmap::DashMap;
use store_api::storage::RegionId;
use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
pub(crate) type Ident = (ClusterId, DatanodeId, RegionId);
/// Detects the region failures.
pub(crate) struct RegionFailureDetector {
options: PhiAccrualFailureDetectorOptions,
detectors: DashMap<Ident, PhiAccrualFailureDetector>,
}
pub(crate) struct FailureDetectorEntry<'a> {
e: RefMulti<'a, Ident, PhiAccrualFailureDetector>,
}
impl FailureDetectorEntry<'_> {
pub(crate) fn region_ident(&self) -> &Ident {
self.e.key()
}
pub(crate) fn failure_detector(&self) -> &PhiAccrualFailureDetector {
self.e.value()
}
}
impl RegionFailureDetector {
pub(crate) fn new(options: PhiAccrualFailureDetectorOptions) -> Self {
Self {
options,
detectors: DashMap::new(),
}
}
/// Returns [PhiAccrualFailureDetector] of the specific ([DatanodeId],[RegionId]).
pub(crate) fn region_failure_detector(
&self,
ident: Ident,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.detectors
.entry(ident)
.or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options))
}
/// Returns a [FailureDetectorEntry] iterator.
pub(crate) fn iter(&self) -> impl Iterator<Item = FailureDetectorEntry> + '_ {
self.detectors
.iter()
.map(move |e| FailureDetectorEntry { e })
}
/// Removes the specific [PhiAccrualFailureDetector] if exists.
pub(crate) fn remove(&self, ident: &Ident) {
self.detectors.remove(ident);
}
/// Removes all [PhiAccrualFailureDetector]s.
pub(crate) fn clear(&self) {
self.detectors.clear()
}
/// Returns true if the specific `ident` exists.
#[cfg(test)]
pub(crate) fn contains(&self, ident: &Ident) -> bool {
self.detectors.contains_key(ident)
}
/// Returns the length
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.detectors.len()
}
/// Returns true if it's empty
#[cfg(test)]
pub(crate) fn is_empty(&self) -> bool {
self.detectors.is_empty()
}
#[cfg(test)]
pub(crate) fn dump(&self) -> RegionFailureDetector {
let mut m = DashMap::with_capacity(self.detectors.len());
m.extend(self.detectors.iter().map(|x| (*x.key(), x.value().clone())));
Self {
detectors: m,
options: self.options,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_failure_detector_container() {
let container = RegionFailureDetector::new(Default::default());
let ident = (0, 2, RegionId::new(1, 1));
let _ = container.region_failure_detector(ident);
assert!(container.contains(&ident));
{
let mut iter = container.iter();
let _ = iter.next().unwrap();
assert!(iter.next().is_none());
}
container.clear();
assert!(container.is_empty());
}
}

View File

@@ -0,0 +1,529 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::PeerLookupServiceRef;
use common_meta::{ClusterId, DatanodeId};
use common_runtime::JoinHandle;
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use error::Error::{MigrationRunning, TableRouteNotFound};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{interval, MissedTickBehavior};
use super::failure_detector::RegionFailureDetector;
use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::node_stat::Stat;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::region_migration::RegionMigrationProcedureTask;
use crate::selector::SelectorOptions;
/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
/// It includes identifiers for the cluster and datanode, a list of regions being monitored,
/// and a timestamp indicating when the heartbeat was sent.
#[derive(Debug)]
pub(crate) struct DatanodeHeartbeat {
cluster_id: ClusterId,
datanode_id: DatanodeId,
// TODO(weny): Considers collecting the memtable size in regions.
regions: Vec<RegionId>,
timestamp: i64,
}
impl From<&Stat> for DatanodeHeartbeat {
fn from(value: &Stat) -> Self {
DatanodeHeartbeat {
cluster_id: value.cluster_id,
datanode_id: value.id,
regions: value.region_stats.iter().map(|x| x.id).collect(),
timestamp: value.timestamp_millis,
}
}
}
/// `Event` represents various types of events that can be processed by the region supervisor.
/// These events are crucial for managing state transitions and handling specific scenarios
/// in the region lifecycle.
///
/// Variants:
/// - `Tick`: This event is used to trigger region failure detection periodically.
/// - `HeartbeatArrived`: This event presents the metasrv received [`DatanodeHeartbeat`] from the datanodes.
/// - `Clear`: This event is used to reset the state of the supervisor, typically used
/// when a system-wide reset or reinitialization is needed.
/// - `Dump`: (Available only in test) This event triggers a dump of the
/// current state for debugging purposes. It allows developers to inspect the internal state
/// of the supervisor during tests.
pub(crate) enum Event {
Tick,
HeartbeatArrived(DatanodeHeartbeat),
Clear,
#[cfg(test)]
Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
}
impl Debug for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Tick => write!(f, "Tick"),
Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
Self::Clear => write!(f, "Clear"),
#[cfg(test)]
Self::Dump(_) => f.debug_struct("Dump").finish(),
}
}
}
pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
/// A background job to generate [`Event::Tick`] type events.
#[derive(Debug)]
pub struct RegionSupervisorTicker {
/// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`].
tick_handle: Mutex<Option<JoinHandle<()>>>,
/// The interval of tick.
tick_interval: Duration,
/// Sends [Event]s.
sender: Sender<Event>,
}
impl RegionSupervisorTicker {
/// Starts the ticker.
pub fn start(&self) {
let mut handle = self.tick_handle.lock().unwrap();
if handle.is_none() {
let sender = self.sender.clone();
let tick_interval = self.tick_interval;
let ticker_loop = tokio::spawn(async move {
let mut interval = interval(tick_interval);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
if let Err(err) = sender.send(Event::Clear).await {
warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
return;
}
loop {
interval.tick().await;
if sender.send(Event::Tick).await.is_err() {
info!("EventReceiver is dropped, tick loop is stopped");
break;
}
}
});
*handle = Some(ticker_loop);
}
}
/// Stops the ticker.
pub fn stop(&self) {
let handle = self.tick_handle.lock().unwrap().take();
if let Some(handle) = handle {
handle.abort();
info!("The tick loop is stopped.");
}
}
}
impl Drop for RegionSupervisorTicker {
fn drop(&mut self) {
self.stop();
}
}
pub type RegionSupervisorRef = Arc<RegionSupervisor>;
/// The default tick interval.
pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
/// The [`RegionSupervisor`] is used to detect Region failures
/// and initiate Region failover upon detection, ensuring uninterrupted region service.
pub struct RegionSupervisor {
/// Used to detect the failure of regions.
failure_detector: RegionFailureDetector,
/// The interval of tick
tick_interval: Duration,
/// Receives [Event]s.
receiver: Receiver<Event>,
/// [Event] Sender.
sender: Sender<Event>,
/// The context of [`SelectorRef`]
selector_context: SelectorContext,
/// Candidate node selector.
selector: SelectorRef,
/// Region migration manager.
region_migration_manager: RegionMigrationManagerRef,
// TODO(weny): find a better way
kv_backend: KvBackendRef,
/// Peer lookup service
peer_lookup: PeerLookupServiceRef,
}
/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
pub(crate) struct HeartbeatAcceptor {
sender: Sender<Event>,
}
impl HeartbeatAcceptor {
/// Accepts heartbeats from datanodes.
pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
if let Err(e) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
error!(e; "RegionSupervisor is stop receiving heartbeat");
}
}
}
#[cfg(test)]
impl RegionSupervisor {
/// Returns the [Event] sender.
pub(crate) fn sender(&self) -> Sender<Event> {
self.sender.clone()
}
}
impl RegionSupervisor {
pub(crate) fn new(
options: PhiAccrualFailureDetectorOptions,
tick_interval: Duration,
selector_context: SelectorContext,
selector: SelectorRef,
region_migration_manager: RegionMigrationManagerRef,
kv_backend: KvBackendRef,
peer_lookup: PeerLookupServiceRef,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(1024);
Self {
failure_detector: RegionFailureDetector::new(options),
tick_interval,
receiver: rx,
sender: tx,
selector_context,
selector,
region_migration_manager,
kv_backend,
peer_lookup,
}
}
/// Returns the [`HeartbeatAcceptor`].
pub(crate) fn heartbeat_acceptor(&self) -> HeartbeatAcceptor {
HeartbeatAcceptor {
sender: self.sender.clone(),
}
}
/// Returns the [`RegionSupervisorTicker`].
pub(crate) fn ticker(&self) -> RegionSupervisorTickerRef {
Arc::new(RegionSupervisorTicker {
tick_interval: self.tick_interval,
sender: self.sender.clone(),
tick_handle: Mutex::new(None),
})
}
/// Runs the main loop.
pub(crate) async fn run(&mut self) {
while let Some(event) = self.receiver.recv().await {
match event {
Event::Tick => {
let regions = self.detect_region_failure();
self.handle_region_failures(regions).await;
}
Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
Event::Clear => self.clear(),
#[cfg(test)]
Event::Dump(sender) => {
let _ = sender.send(self.failure_detector.dump());
}
}
}
info!("RegionSupervisor is stopped!");
}
async fn handle_region_failures(&self, mut regions: Vec<(ClusterId, DatanodeId, RegionId)>) {
if regions.is_empty() {
return;
}
match self.is_maintenance_mode().await {
Ok(false) => {}
Ok(true) => {
info!("Maintenance mode is enabled, skip failover");
return;
}
Err(err) => {
error!(err; "Failed to check maintenance mode");
return;
}
}
let migrating_regions = regions
.extract_if(|(_, _, region_id)| {
self.region_migration_manager.tracker().contains(*region_id)
})
.collect::<Vec<_>>();
for (cluster_id, datanode_id, region_id) in migrating_regions {
self.failure_detector
.remove(&(cluster_id, datanode_id, region_id));
}
warn!("Detects region failures: {:?}", regions);
for (cluster_id, datanode_id, region_id) in regions {
match self.do_failover(cluster_id, datanode_id, region_id).await {
Ok(_) => self
.failure_detector
.remove(&(cluster_id, datanode_id, region_id)),
Err(err) => {
error!(err; "Failed to execute region failover for region: {region_id}, datanode: {datanode_id}");
}
}
}
}
pub(crate) async fn is_maintenance_mode(&self) -> Result<bool> {
self.kv_backend
.exists(MAINTENANCE_KEY.as_bytes())
.await
.context(error::KvBackendSnafu)
}
async fn do_failover(
&self,
cluster_id: ClusterId,
datanode_id: DatanodeId,
region_id: RegionId,
) -> Result<()> {
let from_peer = self
.peer_lookup
.datanode(cluster_id, datanode_id)
.await
.context(error::LookupPeerSnafu {
peer_id: datanode_id,
})?
.context(error::PeerUnavailableSnafu {
peer_id: datanode_id,
})?;
let mut peers = self
.selector
.select(
cluster_id,
&self.selector_context,
SelectorOptions {
min_required_items: 1,
allow_duplication: false,
},
)
.await?;
let to_peer = peers.remove(0);
let task = RegionMigrationProcedureTask {
cluster_id,
region_id,
from_peer,
to_peer,
replay_timeout: Duration::from_secs(60),
};
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
return match err {
// Returns Ok if it's running or table is dropped.
MigrationRunning { .. } | TableRouteNotFound { .. } => Ok(()),
err => Err(err),
};
};
Ok(())
}
/// Detects the failure of regions.
fn detect_region_failure(&self) -> Vec<(ClusterId, DatanodeId, RegionId)> {
self.failure_detector
.iter()
.filter_map(|e| {
// Intentionally not place `current_time_millis()` out of the iteration.
// The failure detection determination should be happened "just in time",
// i.e., failed or not has to be compared with the most recent "now".
// Besides, it might reduce the false positive of failure detection,
// because during the iteration, heartbeats are coming in as usual,
// and the `phi`s are still updating.
if !e.failure_detector().is_available(current_time_millis()) {
Some(*e.region_ident())
} else {
None
}
})
.collect::<Vec<_>>()
}
/// Updates the state of corresponding failure detectors.
fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
for region_id in heartbeat.regions {
let ident = (heartbeat.cluster_id, heartbeat.datanode_id, region_id);
let mut detector = self.failure_detector.region_failure_detector(ident);
detector.heartbeat(heartbeat.timestamp);
}
}
fn clear(&self) {
self.failure_detector.clear();
}
}
#[cfg(test)]
pub(crate) mod tests {
use std::assert_matches::assert_matches;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_meta::peer::Peer;
use common_meta::test_util::NoopPeerLookupService;
use common_time::util::current_time_millis;
use rand::Rng;
use store_api::storage::RegionId;
use tokio::sync::oneshot;
use tokio::time::sleep;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::region::supervisor::{
DatanodeHeartbeat, Event, RegionSupervisor, RegionSupervisorTicker,
};
use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector};
pub(crate) fn new_test_supervisor() -> RegionSupervisor {
let env = TestingEnv::new();
let selector_context = new_test_selector_context();
let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
let context_factory = env.context_factory();
let region_migration_manager = Arc::new(RegionMigrationManager::new(
env.procedure_manager().clone(),
context_factory,
));
let kv_backend = env.kv_backend();
let peer_lookup = Arc::new(NoopPeerLookupService);
RegionSupervisor::new(
Default::default(),
Duration::from_secs(1),
selector_context,
selector,
region_migration_manager,
kv_backend,
peer_lookup,
)
}
#[tokio::test]
async fn test_heartbeat() {
let mut supervisor = new_test_supervisor();
let sender = supervisor.sender();
tokio::spawn(async move { supervisor.run().await });
sender
.send(Event::HeartbeatArrived(DatanodeHeartbeat {
cluster_id: 0,
datanode_id: 0,
regions: vec![RegionId::new(1, 1)],
timestamp: 100,
}))
.await
.unwrap();
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
assert!(detector.contains(&(0, 0, RegionId::new(1, 1))));
// Clear up
sender.send(Event::Clear).await.unwrap();
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
assert!(rx.await.unwrap().is_empty());
fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
let mut rng = rand::thread_rng();
let start = current_time_millis();
(0..2000)
.map(|i| DatanodeHeartbeat {
timestamp: start + i * 1000 + rng.gen_range(0..100),
cluster_id: 0,
datanode_id,
regions: region_ids
.iter()
.map(|number| RegionId::new(0, *number))
.collect(),
})
.collect::<Vec<_>>()
}
let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
for heartbeat in heartbeats {
sender
.send(Event::HeartbeatArrived(heartbeat))
.await
.unwrap();
}
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
assert_eq!(detector.len(), 3);
for e in detector.iter() {
let fd = e.failure_detector();
let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
let start = last_heartbeat_time;
// Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
let now = start + i * 1000;
assert_eq!(fd.phi(now), 0.0);
}
// ... then in less than two seconds, phi is above the threshold.
// The same effect can be seen in the diagrams in Akka's document.
let now = start + acceptable_heartbeat_pause_millis + 1000;
assert!(fd.phi(now) < fd.threshold() as _);
let now = start + acceptable_heartbeat_pause_millis + 2000;
assert!(fd.phi(now) > fd.threshold() as _);
}
}
#[tokio::test]
async fn test_supervisor_ticker() {
let (tx, mut rx) = tokio::sync::mpsc::channel(128);
let ticker = RegionSupervisorTicker {
tick_handle: Mutex::new(None),
tick_interval: Duration::from_millis(10),
sender: tx,
};
// It's ok if we start the ticker again.
for _ in 0..2 {
ticker.start();
sleep(Duration::from_millis(100)).await;
ticker.stop();
assert!(!rx.is_empty());
while let Ok(event) = rx.try_recv() {
assert_matches!(event, Event::Tick | Event::Clear);
}
}
}
}

View File

@@ -16,9 +16,10 @@ mod common;
pub mod lease_based;
pub mod load_based;
pub mod round_robin;
#[cfg(test)]
pub(crate) mod test_utils;
mod weight_compute;
mod weighted_choose;
use serde::{Deserialize, Serialize};
use crate::error;

View File

@@ -0,0 +1,74 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_meta::distributed_time_constants::{FLOWNODE_LEASE_SECS, REGION_LEASE_SECS};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use rand::prelude::SliceRandom;
use crate::cluster::MetaPeerClientBuilder;
use crate::error::Result;
use crate::metasrv::SelectorContext;
use crate::selector::{Namespace, Selector, SelectorOptions};
/// Returns [SelectorContext] for test purpose.
pub fn new_test_selector_context() -> SelectorContext {
let kv_backend = Arc::new(MemoryKvBackend::new());
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
.in_memory(kv_backend.clone())
.build()
.map(Arc::new)
.unwrap();
SelectorContext {
server_addr: "127.0.0.1:3002".to_string(),
datanode_lease_secs: REGION_LEASE_SECS,
flownode_lease_secs: FLOWNODE_LEASE_SECS,
kv_backend,
meta_peer_client,
table_id: None,
}
}
/// It always returns shuffled `nodes`.
pub struct RandomNodeSelector {
nodes: Vec<Peer>,
}
impl RandomNodeSelector {
pub fn new(nodes: Vec<Peer>) -> Self {
Self { nodes }
}
}
#[async_trait::async_trait]
impl Selector for RandomNodeSelector {
type Context = SelectorContext;
type Output = Vec<Peer>;
async fn select(
&self,
_ns: Namespace,
_ctx: &Self::Context,
_opts: SelectorOptions,
) -> Result<Self::Output> {
let mut rng = rand::thread_rng();
let mut nodes = self.nodes.clone();
nodes.shuffle(&mut rng);
Ok(nodes)
}
}

View File

@@ -18,14 +18,11 @@ use std::sync::Arc;
use chrono::DateTime;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::ClusterId;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_time::util as time_util;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
@@ -33,12 +30,8 @@ use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lock::memory::MemLock;
use crate::metasrv::SelectorContext;
use crate::procedure::region_failover::RegionFailoverManager;
use crate::selector::lease_based::LeaseBasedSelector;
pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64) -> RegionRoute {
let region = Region {
@@ -79,33 +72,6 @@ pub(crate) fn create_selector_context() -> SelectorContext {
}
}
pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
let kv_backend = Arc::new(MemoryKvBackend::new());
let pushers = Pushers::default();
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence);
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));
let selector = Arc::new(LeaseBasedSelector);
let selector_ctx = create_selector_context();
let in_memory = Arc::new(MemoryKvBackend::new());
Arc::new(RegionFailoverManager::new(
10,
in_memory,
kv_backend.clone(),
mailbox,
procedure_manager,
(selector, selector_ctx),
Arc::new(MemLock::default()),
Arc::new(TableMetadataManager::new(kv_backend)),
))
}
pub(crate) async fn prepare_table_region_and_info_value(
table_metadata_manager: &TableMetadataManagerRef,
table: &str,