feat: metasrv collects datanode heartbeats for region failure detection (#1214)

* feat: metasrv collects datanode heartbeats for region failure detection

* chore: change visibility

* fix: fragile tests

* Update src/meta-srv/src/handler/persist_stats_handler.rs

Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>

* Update src/meta-srv/src/handler/failure_handler.rs

Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>

* fix: resolve PR comments

* fix: resolve PR comments

* fix: resolve PR comments

---------

Co-authored-by: shuiyisong <xixing.sys@gmail.com>
Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com>
This commit is contained in:
LFC
2023-03-24 12:28:34 +08:00
committed by GitHub
parent 92963b9614
commit 0f160a73be
15 changed files with 571 additions and 175 deletions

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
use api::v1::meta::{RegionStat, TableName};
use common_telemetry::{info, warn};
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::TableId;
use table::requests::CreateTableRequest;
@@ -228,34 +228,25 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>(
/// The stat of regions in the datanode node.
/// The number of regions can be got from len of vec.
pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> Result<(u64, Vec<RegionStat>)> {
///
/// Ignores any errors occurred during iterating regions. The intention of this method is to
/// collect region stats that will be carried in Datanode's heartbeat to Metasrv, so it's a
/// "try our best" job.
pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec<RegionStat>) {
let mut region_number: u64 = 0;
let mut region_stats = Vec::new();
for catalog_name in catalog_manager.catalog_names()? {
let catalog =
catalog_manager
.catalog(&catalog_name)?
.context(error::CatalogNotFoundSnafu {
catalog_name: &catalog_name,
})?;
let Ok(catalog_names) = catalog_manager.catalog_names() else { return (region_number, region_stats) };
for catalog_name in catalog_names {
let Ok(Some(catalog)) = catalog_manager.catalog(&catalog_name) else { continue };
for schema_name in catalog.schema_names()? {
let schema = catalog
.schema(&schema_name)?
.context(error::SchemaNotFoundSnafu {
catalog: &catalog_name,
schema: &schema_name,
})?;
let Ok(schema_names) = catalog.schema_names() else { continue };
for schema_name in schema_names {
let Ok(Some(schema)) = catalog.schema(&schema_name) else { continue };
for table_name in schema.table_names()? {
let table =
schema
.table(&table_name)
.await?
.context(error::TableNotFoundSnafu {
table_info: &table_name,
})?;
let Ok(table_names) = schema.table_names() else { continue };
for table_name in table_names {
let Ok(Some(table)) = schema.table(&table_name).await else { continue };
let region_numbers = &table.table_info().meta.region_numbers;
region_number += region_numbers.len() as u64;
@@ -282,6 +273,5 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> Result<(u64,
}
}
}
Ok((region_number, region_stats))
(region_number, region_stats)
}

View File

@@ -106,13 +106,7 @@ impl HeartbeatTask {
let mut tx = Self::create_streams(&meta_client, running.clone()).await?;
common_runtime::spawn_bg(async move {
while running.load(Ordering::Acquire) {
let (region_num, region_stats) = match datanode_stat(&catalog_manager_clone).await {
Ok(datanode_stat) => (datanode_stat.0 as i64, datanode_stat.1),
Err(e) => {
error!("failed to get region status, err: {e:?}");
(-1, vec![])
}
};
let (region_num, region_stats) = datanode_stat(&catalog_manager_clone).await;
let req = HeartbeatRequest {
peer: Some(Peer {
@@ -120,7 +114,7 @@ impl HeartbeatTask {
addr: addr.clone(),
}),
node_stat: Some(NodeStat {
region_num,
region_num: region_num as _,
..Default::default()
}),
region_stats,

View File

@@ -261,7 +261,7 @@ mod tests {
let stat_val = StatValue { stats: vec![stat] }.try_into().unwrap();
let kv = KeyValue {
key: stat_key.clone().into(),
key: stat_key.into(),
value: stat_val,
};

View File

@@ -32,30 +32,27 @@ use std::collections::VecDeque;
///
/// where F is the cumulative distribution function of a normal distribution with mean
/// and standard deviation estimated from historical heartbeat inter-arrival times.
#[cfg_attr(test, derive(Clone))]
pub(crate) struct PhiAccrualFailureDetector {
/// A low threshold is prone to generate many wrong suspicions but ensures a quick detection
/// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but
/// needs more time to detect actual crashes.
threshold: f64,
/// Number of samples to use for calculation of mean and standard deviation of inter-arrival
/// times.
max_sample_size: u32,
threshold: f32,
/// Minimum standard deviation to use for the normal distribution used when calculating phi.
/// Too low standard deviation might result in too much sensitivity for sudden, but normal,
/// deviations in heartbeat inter arrival times.
min_std_deviation_millis: f64,
min_std_deviation_millis: f32,
/// Duration corresponding to number of potentially lost/delayed heartbeats that will be
/// accepted before considering it to be an anomaly.
/// This margin is important to be able to survive sudden, occasional, pauses in heartbeat
/// arrivals, due to for example network drop.
acceptable_heartbeat_pause_millis: i64,
acceptable_heartbeat_pause_millis: u32,
/// Bootstrap the stats with heartbeats that corresponds to this duration, with a rather high
/// standard deviation (since environment is unknown in the beginning).
first_heartbeat_estimate_millis: i64,
first_heartbeat_estimate_millis: u32,
heartbeat_history: HeartbeatHistory,
last_heartbeat_millis: Option<i64>,
@@ -65,14 +62,12 @@ impl Default for PhiAccrualFailureDetector {
fn default() -> Self {
// default configuration is the same as of Akka:
// https://github.com/akka/akka/blob/main/akka-cluster/src/main/resources/reference.conf#L181
let max_sample_size = 1000;
Self {
threshold: 8_f64,
max_sample_size,
min_std_deviation_millis: 100_f64,
threshold: 8_f32,
min_std_deviation_millis: 100_f32,
acceptable_heartbeat_pause_millis: 3000,
first_heartbeat_estimate_millis: 1000,
heartbeat_history: HeartbeatHistory::new(max_sample_size),
heartbeat_history: HeartbeatHistory::new(1000),
last_heartbeat_millis: None,
}
}
@@ -95,28 +90,28 @@ impl PhiAccrualFailureDetector {
// bootstrap with 2 entries with rather high standard deviation
let std_deviation = self.first_heartbeat_estimate_millis / 4;
self.heartbeat_history
.add(self.first_heartbeat_estimate_millis - std_deviation);
.add((self.first_heartbeat_estimate_millis - std_deviation) as _);
self.heartbeat_history
.add(self.first_heartbeat_estimate_millis + std_deviation);
.add((self.first_heartbeat_estimate_millis + std_deviation) as _);
}
let _ = self.last_heartbeat_millis.insert(ts_millis);
}
pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
self.phi(ts_millis) < self.threshold
self.phi(ts_millis) < self.threshold as _
}
/// The suspicion level of the accrual failure detector.
///
/// If a connection does not have any records in failure detector then it is considered healthy.
fn phi(&self, ts_millis: i64) -> f64 {
pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
let time_diff = ts_millis - last_heartbeat_millis;
let mean = self.heartbeat_history.mean();
let std_deviation = self
.heartbeat_history
.std_deviation()
.max(self.min_std_deviation_millis);
.max(self.min_std_deviation_millis as _);
phi(
time_diff,
@@ -128,6 +123,16 @@ impl PhiAccrualFailureDetector {
0.0
}
}
#[cfg(test)]
pub(crate) fn threshold(&self) -> f32 {
self.threshold
}
#[cfg(test)]
pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
self.acceptable_heartbeat_pause_millis
}
}
/// Calculation of phi, derived from the Cumulative distribution function for
@@ -141,6 +146,8 @@ impl PhiAccrualFailureDetector {
/// Usually phi = 1 means likeliness that we will make a mistake is about 10%.
/// The likeliness is about 1% with phi = 2, 0.1% with phi = 3 and so on.
fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
assert_ne!(std_deviation, 0.0);
let time_diff = time_diff as f64;
let y = (time_diff - mean) / std_deviation;
let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
@@ -155,8 +162,12 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
/// It is capped by the number of samples specified in `max_sample_size`.
///
/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
#[derive(Clone)]
struct HeartbeatHistory {
/// Number of samples to use for calculation of mean and standard deviation of inter-arrival
/// times.
max_sample_size: u32,
intervals: VecDeque<i64>,
interval_sum: i64,
squared_interval_sum: i64,
@@ -198,7 +209,7 @@ impl HeartbeatHistory {
let oldest = self
.intervals
.pop_front()
.expect("intervals must not empty here");
.expect("intervals must not be empty here");
self.interval_sum -= oldest;
self.squared_interval_sum -= oldest * oldest;
}
@@ -207,42 +218,9 @@ impl HeartbeatHistory {
#[cfg(test)]
mod tests {
use common_time::util::current_time_millis;
use rand::Rng;
use super::*;
#[test]
fn test_heartbeat() {
// Generate 2000 heartbeats start from now. Heartbeat interval is one second, plus some
// random millis.
fn generate_heartbeats() -> Vec<i64> {
let mut rng = rand::thread_rng();
let start = current_time_millis();
(0..2000)
.map(|i| start + i * 1000 + rng.gen_range(0..100))
.collect::<Vec<i64>>()
}
let heartbeats = generate_heartbeats();
let mut fd = PhiAccrualFailureDetector::default();
// feed the failure detector with these heartbeats
heartbeats.iter().for_each(|x| fd.heartbeat(*x));
let start = *heartbeats.last().unwrap();
// Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
for i in 1..=fd.acceptable_heartbeat_pause_millis / 1000 {
let now = start + i * 1000;
assert_eq!(fd.phi(now), 0.0);
}
// ... then in less than two seconds, phi is above the threshold.
// The same effect can be seen in the diagrams in Akka's document.
let now = start + fd.acceptable_heartbeat_pause_millis + 1000;
assert!(fd.phi(now) < fd.threshold);
let now = start + fd.acceptable_heartbeat_pause_millis + 2000;
assert!(fd.phi(now) > fd.threshold);
}
#[test]
fn test_is_available() {
let ts_millis = current_time_millis();
@@ -254,12 +232,13 @@ mod tests {
fd.heartbeat(ts_millis);
let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
// is available when heartbeat
assert!(fd.is_available(ts_millis));
// is available before heartbeat timeout
assert!(fd.is_available(ts_millis + fd.acceptable_heartbeat_pause_millis / 2));
assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
// is not available after heartbeat timeout
assert!(!fd.is_available(ts_millis + fd.acceptable_heartbeat_pause_millis * 2));
assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
}
#[test]
@@ -286,14 +265,15 @@ mod tests {
fd.heartbeat(ts_millis);
let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
// phi == 0 when heartbeat
assert_eq!(fd.phi(ts_millis), 0.0);
// phi < threshold before heartbeat timeout
let now = ts_millis + fd.acceptable_heartbeat_pause_millis / 2;
assert!(fd.phi(now) < fd.threshold);
let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
assert!(fd.phi(now) < fd.threshold as _);
// phi >= threshold after heartbeat timeout
let now = ts_millis + fd.acceptable_heartbeat_pause_millis * 2;
assert!(fd.phi(now) >= fd.threshold);
let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
assert!(fd.phi(now) >= fd.threshold as _);
}
// The following test cases are port from Akka's test:
@@ -349,7 +329,6 @@ mod tests {
fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
let fd = PhiAccrualFailureDetector {
threshold: 8.0,
max_sample_size: 1000,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 0,
first_heartbeat_estimate_millis: 1000,
@@ -364,7 +343,6 @@ mod tests {
fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
let mut fd = PhiAccrualFailureDetector {
threshold: 8.0,
max_sample_size: 1000,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 0,
first_heartbeat_estimate_millis: 1000,
@@ -381,7 +359,6 @@ mod tests {
fn test_return_phi_using_first_interval_after_second_heartbeat() {
let mut fd = PhiAccrualFailureDetector {
threshold: 8.0,
max_sample_size: 1000,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 0,
first_heartbeat_estimate_millis: 1000,
@@ -398,7 +375,6 @@ mod tests {
fn test_is_available_after_a_series_of_successful_heartbeats() {
let mut fd = PhiAccrualFailureDetector {
threshold: 8.0,
max_sample_size: 1000,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 0,
first_heartbeat_estimate_millis: 1000,
@@ -417,7 +393,6 @@ mod tests {
fn test_is_not_available_if_heartbeat_are_missed() {
let mut fd = PhiAccrualFailureDetector {
threshold: 3.0,
max_sample_size: 1000,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 0,
first_heartbeat_estimate_millis: 1000,
@@ -436,7 +411,6 @@ mod tests {
) {
let mut fd = PhiAccrualFailureDetector {
threshold: 8.0,
max_sample_size: 1000,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 3000,
first_heartbeat_estimate_millis: 1000,
@@ -476,7 +450,6 @@ mod tests {
fn test_accept_some_configured_missing_heartbeats() {
let mut fd = PhiAccrualFailureDetector {
threshold: 8.0,
max_sample_size: 1000,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 3000,
first_heartbeat_estimate_millis: 1000,
@@ -496,7 +469,6 @@ mod tests {
fn test_fail_after_configured_acceptable_missing_heartbeats() {
let mut fd = PhiAccrualFailureDetector {
threshold: 8.0,
max_sample_size: 1000,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 3000,
first_heartbeat_estimate_millis: 1000,
@@ -518,7 +490,6 @@ mod tests {
fn test_use_max_sample_size_heartbeats() {
let mut fd = PhiAccrualFailureDetector {
threshold: 8.0,
max_sample_size: 3,
min_std_deviation_millis: 100.0,
acceptable_heartbeat_pause_millis: 0,
first_heartbeat_estimate_millis: 1000,

View File

@@ -14,6 +14,7 @@
pub use check_leader_handler::CheckLeaderHandler;
pub use collect_stats_handler::CollectStatsHandler;
pub use failure_handler::RegionFailureHandler;
pub use keep_lease_handler::KeepLeaseHandler;
pub use on_leader_start::OnLeaderStartHandler;
pub use persist_stats_handler::PersistStatsHandler;
@@ -21,6 +22,7 @@ pub use response_header_handler::ResponseHeaderHandler;
mod check_leader_handler;
mod collect_stats_handler;
mod failure_handler;
mod instruction;
mod keep_lease_handler;
pub mod node_stat;
@@ -54,8 +56,8 @@ pub trait HeartbeatHandler: Send + Sync {
#[derive(Debug, Default)]
pub struct HeartbeatAccumulator {
pub header: Option<ResponseHeader>,
pub stats: Vec<Stat>,
pub instructions: Vec<Instruction>,
pub stat: Option<Stat>,
}
impl HeartbeatAccumulator {

View File

@@ -12,39 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::VecDeque;
use api::v1::meta::HeartbeatRequest;
use common_telemetry::debug;
use dashmap::mapref::entry::Entry;
use dashmap::DashMap;
use super::node_stat::Stat;
use crate::error::Result;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::Context;
type StatKey = (u64, u64);
pub struct CollectStatsHandler {
max_cached_stats_per_key: usize,
cache: DashMap<StatKey, VecDeque<Stat>>,
}
impl Default for CollectStatsHandler {
fn default() -> Self {
Self::new(10)
}
}
impl CollectStatsHandler {
pub fn new(max_cached_stats_per_key: usize) -> Self {
Self {
max_cached_stats_per_key,
cache: DashMap::new(),
}
}
}
pub struct CollectStatsHandler;
#[async_trait::async_trait]
impl HeartbeatHandler for CollectStatsHandler {
@@ -60,21 +36,7 @@ impl HeartbeatHandler for CollectStatsHandler {
match Stat::try_from(req.clone()) {
Ok(stat) => {
let key = (stat.cluster_id, stat.id);
match self.cache.entry(key) {
Entry::Occupied(mut e) => {
let deque = e.get_mut();
deque.push_front(stat);
if deque.len() >= self.max_cached_stats_per_key {
acc.stats = deque.drain(..).collect();
}
}
Entry::Vacant(e) => {
let mut stat_vec = VecDeque::with_capacity(self.max_cached_stats_per_key);
stat_vec.push_front(stat);
e.insert(stat_vec);
}
}
let _ = acc.stat.insert(stat);
}
Err(_) => {
debug!("Incomplete heartbeat data: {:?}", req);

View File

@@ -0,0 +1,151 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod runner;
use api::v1::meta::HeartbeatRequest;
use async_trait::async_trait;
use crate::error::Result;
use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::{Context, ElectionRef};
#[derive(Eq, Hash, PartialEq, Clone)]
pub(crate) struct RegionIdent {
catalog: String,
schema: String,
table: String,
region_id: u64,
}
// TODO(LFC): TBC
pub(crate) struct DatanodeHeartbeat {
#[allow(dead_code)]
cluster_id: u64,
#[allow(dead_code)]
node_id: u64,
region_idents: Vec<RegionIdent>,
heartbeat_time: i64,
}
pub struct RegionFailureHandler {
failure_detect_runner: FailureDetectRunner,
}
impl RegionFailureHandler {
pub fn new(election: Option<ElectionRef>) -> Self {
Self {
failure_detect_runner: FailureDetectRunner::new(election),
}
}
pub async fn start(&mut self) {
self.failure_detect_runner.start().await;
}
}
#[async_trait]
impl HeartbeatHandler for RegionFailureHandler {
async fn handle(
&self,
_: &HeartbeatRequest,
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if ctx.is_infancy {
self.failure_detect_runner
.send_control(FailureDetectControl::Purge)
.await;
}
if ctx.is_skip_all() {
return Ok(());
}
let Some(stat) = acc.stat.as_ref() else { return Ok(()) };
let heartbeat = DatanodeHeartbeat {
cluster_id: stat.cluster_id,
node_id: stat.id,
region_idents: stat
.region_stats
.iter()
.map(|x| RegionIdent {
catalog: x.catalog.clone(),
schema: x.schema.clone(),
table: x.table.clone(),
region_id: x.id,
})
.collect(),
heartbeat_time: stat.timestamp_millis,
};
self.failure_detect_runner.send_heartbeat(heartbeat).await;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::handler::node_stat::{RegionStat, Stat};
use crate::metasrv::builder::MetaSrvBuilder;
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_heartbeat() {
let mut handler = RegionFailureHandler::new(None);
handler.start().await;
let req = &HeartbeatRequest::default();
let builder = MetaSrvBuilder::new();
let metasrv = builder.build().await;
let mut ctx = metasrv.new_ctx();
ctx.is_infancy = false;
let acc = &mut HeartbeatAccumulator::default();
fn new_region_stat(region_id: u64) -> RegionStat {
RegionStat {
id: region_id,
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
rcus: 0,
wcus: 0,
approximate_bytes: 0,
approximate_rows: 0,
}
}
acc.stat = Some(Stat {
cluster_id: 1,
id: 42,
region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
timestamp_millis: 1000,
..Default::default()
});
handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 3);
// infancy makes heartbeats re-accumulated
ctx.is_infancy = true;
acc.stat = None;
handler.handle(req, &mut ctx, acc).await.unwrap();
let dump = handler.failure_detect_runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
}
}

View File

@@ -0,0 +1,309 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::ops::DerefMut;
use std::sync::Arc;
use std::time::{Duration, Instant};
use common_telemetry::error;
use common_time::util::current_time_millis;
use dashmap::mapref::multiple::RefMulti;
use dashmap::DashMap;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinHandle;
use crate::failure_detector::PhiAccrualFailureDetector;
use crate::handler::failure_handler::{DatanodeHeartbeat, RegionIdent};
use crate::metasrv::ElectionRef;
pub(crate) enum FailureDetectControl {
Purge,
#[cfg(test)]
Dump(tokio::sync::oneshot::Sender<FailureDetectorContainer>),
}
pub(crate) struct FailureDetectRunner {
election: Option<ElectionRef>,
heartbeat_tx: Sender<DatanodeHeartbeat>,
heartbeat_rx: Option<Receiver<DatanodeHeartbeat>>,
control_tx: Sender<FailureDetectControl>,
control_rx: Option<Receiver<FailureDetectControl>>,
receiver_handle: Option<JoinHandle<()>>,
runner_handle: Option<JoinHandle<()>>,
}
impl FailureDetectRunner {
pub(crate) fn new(election: Option<ElectionRef>) -> Self {
let (heartbeat_tx, heartbeat_rx) = mpsc::channel::<DatanodeHeartbeat>(1024);
let (control_tx, control_rx) = mpsc::channel::<FailureDetectControl>(1024);
Self {
election,
heartbeat_tx,
heartbeat_rx: Some(heartbeat_rx),
control_tx,
control_rx: Some(control_rx),
receiver_handle: None,
runner_handle: None,
}
}
pub(crate) async fn send_heartbeat(&self, heartbeat: DatanodeHeartbeat) {
if let Err(e) = self.heartbeat_tx.send(heartbeat).await {
error!("FailureDetectRunner is stop receiving heartbeats: {}", e)
}
}
pub(crate) async fn send_control(&self, control: FailureDetectControl) {
if let Err(e) = self.control_tx.send(control).await {
error!("FailureDetectRunner is stop receiving controls: {}", e)
}
}
pub(crate) async fn start(&mut self) {
let failure_detectors = Arc::new(FailureDetectorContainer(DashMap::new()));
self.start_with(failure_detectors).await
}
async fn start_with(&mut self, failure_detectors: Arc<FailureDetectorContainer>) {
let Some(mut heartbeat_rx) = self.heartbeat_rx.take() else { return };
let Some(mut control_rx) = self.control_rx.take() else { return };
let container = failure_detectors.clone();
let receiver_handle = common_runtime::spawn_bg(async move {
loop {
tokio::select! {
Some(control) = control_rx.recv() => {
match control {
FailureDetectControl::Purge => container.clear(),
#[cfg(test)]
FailureDetectControl::Dump(tx) => {
// Drain any heartbeats that are not handled before dump.
while let Ok(heartbeat) = heartbeat_rx.try_recv() {
for ident in heartbeat.region_idents {
let mut detector = container.get_failure_detector(ident);
detector.heartbeat(heartbeat.heartbeat_time);
}
}
let _ = tx.send(container.dump());
}
}
}
Some(heartbeat) = heartbeat_rx.recv() => {
for ident in heartbeat.region_idents {
let mut detector = container.get_failure_detector(ident);
detector.heartbeat(heartbeat.heartbeat_time);
}
}
}
}
});
self.receiver_handle = Some(receiver_handle);
let election = self.election.clone();
let runner_handle = common_runtime::spawn_bg(async move {
loop {
let start = Instant::now();
let is_leader = election.as_ref().map(|x| x.is_leader()).unwrap_or(true);
if is_leader {
for e in failure_detectors.iter() {
if e.failure_detector().is_available(current_time_millis()) {
// TODO(LFC): TBC
}
}
}
let elapsed = Instant::now().duration_since(start);
if let Some(sleep) = Duration::from_secs(1).checked_sub(elapsed) {
tokio::time::sleep(sleep).await;
} // else the elapsed time is exceeding one second, we should continue working immediately
}
});
self.runner_handle = Some(runner_handle);
}
#[cfg(test)]
fn abort(&mut self) {
let Some(handle) = self.receiver_handle.take() else { return };
handle.abort();
let Some(handle) = self.runner_handle.take() else { return };
handle.abort();
}
#[cfg(test)]
pub(crate) async fn dump(&self) -> FailureDetectorContainer {
let (tx, rx) = tokio::sync::oneshot::channel();
self.send_control(FailureDetectControl::Dump(tx)).await;
rx.await.unwrap()
}
}
pub(crate) struct FailureDetectorEntry<'a> {
e: RefMulti<'a, RegionIdent, PhiAccrualFailureDetector>,
}
impl FailureDetectorEntry<'_> {
fn failure_detector(&self) -> &PhiAccrualFailureDetector {
self.e.value()
}
}
pub(crate) struct FailureDetectorContainer(DashMap<RegionIdent, PhiAccrualFailureDetector>);
impl FailureDetectorContainer {
fn get_failure_detector(
&self,
ident: RegionIdent,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.0
.entry(ident)
.or_insert_with(PhiAccrualFailureDetector::default)
}
pub(crate) fn iter(&self) -> Box<dyn Iterator<Item = FailureDetectorEntry> + '_> {
Box::new(self.0.iter().map(move |e| FailureDetectorEntry { e })) as _
}
fn clear(&self) {
self.0.clear()
}
#[cfg(test)]
fn dump(&self) -> FailureDetectorContainer {
let mut m = DashMap::with_capacity(self.0.len());
m.extend(self.0.iter().map(|x| (x.key().clone(), x.value().clone())));
Self(m)
}
}
#[cfg(test)]
mod tests {
use rand::Rng;
use super::*;
#[test]
fn test_default_failure_detector_container() {
let container = FailureDetectorContainer(DashMap::new());
let ident = RegionIdent {
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
region_id: 1,
};
let _ = container.get_failure_detector(ident.clone());
assert!(container.0.contains_key(&ident));
{
let mut iter = container.iter();
assert!(iter.next().is_some());
assert!(iter.next().is_none());
}
container.clear();
assert!(container.0.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_control() {
let container = FailureDetectorContainer(DashMap::new());
let ident = RegionIdent {
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
region_id: 1,
};
container.get_failure_detector(ident.clone());
let mut runner = FailureDetectRunner::new(None);
runner.start_with(Arc::new(container)).await;
let dump = runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 1);
runner.send_control(FailureDetectControl::Purge).await;
let dump = runner.dump().await;
assert_eq!(dump.iter().collect::<Vec<_>>().len(), 0);
runner.abort();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_heartbeat() {
let mut runner = FailureDetectRunner::new(None);
runner.start().await;
// Generate 2000 heartbeats start from now. Heartbeat interval is one second, plus some random millis.
fn generate_heartbeats(node_id: u64, region_ids: Vec<u64>) -> Vec<DatanodeHeartbeat> {
let mut rng = rand::thread_rng();
let start = current_time_millis();
(0..2000)
.map(|i| DatanodeHeartbeat {
cluster_id: 1,
node_id,
region_idents: region_ids
.iter()
.map(|&region_id| RegionIdent {
catalog: "a".to_string(),
schema: "b".to_string(),
table: "c".to_string(),
region_id,
})
.collect(),
heartbeat_time: start + i * 1000 + rng.gen_range(0..100),
})
.collect::<Vec<_>>()
}
let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
let last_heartbeat_time = heartbeats.last().unwrap().heartbeat_time;
for heartbeat in heartbeats {
runner.send_heartbeat(heartbeat).await;
}
let dump = runner.dump().await;
let failure_detectors = dump.iter().collect::<Vec<_>>();
assert_eq!(failure_detectors.len(), 3);
failure_detectors.iter().for_each(|e| {
let fd = e.failure_detector();
let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
let start = last_heartbeat_time;
// Within the "acceptable_heartbeat_pause_millis" period, phi is zero ...
for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
let now = start + i * 1000;
assert_eq!(fd.phi(now), 0.0);
}
// ... then in less than two seconds, phi is above the threshold.
// The same effect can be seen in the diagrams in Akka's document.
let now = start + acceptable_heartbeat_pause_millis + 1000;
assert!(fd.phi(now) < fd.threshold() as _);
let now = start + acceptable_heartbeat_pause_millis + 2000;
assert!(fd.phi(now) > fd.threshold() as _);
});
runner.abort();
}
}

View File

@@ -31,6 +31,7 @@ impl HeartbeatHandler for OnLeaderStartHandler {
) -> Result<()> {
if let Some(election) = &ctx.election {
if election.in_infancy() {
ctx.is_infancy = true;
ctx.reset_in_memory();
}
}

View File

@@ -13,14 +13,20 @@
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, PutRequest};
use dashmap::DashMap;
use crate::error::Result;
use crate::handler::node_stat::Stat;
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::keys::StatValue;
use crate::keys::{StatKey, StatValue};
use crate::metasrv::Context;
const MAX_CACHED_STATS_PER_KEY: usize = 10;
#[derive(Default)]
pub struct PersistStatsHandler;
pub struct PersistStatsHandler {
stats_cache: DashMap<StatKey, Vec<Stat>>,
}
#[async_trait::async_trait]
impl HeartbeatHandler for PersistStatsHandler {
@@ -30,18 +36,25 @@ impl HeartbeatHandler for PersistStatsHandler {
ctx: &mut Context,
acc: &mut HeartbeatAccumulator,
) -> Result<()> {
if ctx.is_skip_all() || acc.stats.is_empty() {
if ctx.is_skip_all() {
return Ok(());
}
let stats = &mut acc.stats;
let key = match stats.get(0) {
Some(stat) => stat.stat_key(),
None => return Ok(()),
};
let Some(stat) = acc.stat.take() else { return Ok(()) };
// take stats from &mut acc.stats, avoid clone of vec
let stats = std::mem::take(stats);
let key = stat.stat_key();
let mut entry = self
.stats_cache
.entry(key)
.or_insert_with(|| Vec::with_capacity(MAX_CACHED_STATS_PER_KEY));
let stats = entry.value_mut();
stats.push(stat);
if stats.len() < MAX_CACHED_STATS_PER_KEY {
return Ok(());
}
let stats = stats.drain(..).collect();
let val = StatValue { stats };
@@ -65,7 +78,6 @@ mod tests {
use api::v1::meta::RangeRequest;
use super::*;
use crate::handler::node_stat::Stat;
use crate::keys::StatKey;
use crate::service::store::memory::MemStore;
@@ -83,24 +95,23 @@ mod tests {
catalog: None,
schema: None,
table: None,
is_infancy: false,
};
let req = HeartbeatRequest::default();
let mut acc = HeartbeatAccumulator {
stats: vec![Stat {
cluster_id: 3,
id: 101,
region_num: Some(100),
let handler = PersistStatsHandler::default();
for i in 1..=MAX_CACHED_STATS_PER_KEY {
let mut acc = HeartbeatAccumulator {
stat: Some(Stat {
cluster_id: 3,
id: 101,
region_num: Some(i as _),
..Default::default()
}),
..Default::default()
}],
..Default::default()
};
let stats_handler = PersistStatsHandler;
stats_handler
.handle(&req, &mut ctx, &mut acc)
.await
.unwrap();
};
handler.handle(&req, &mut ctx, &mut acc).await.unwrap();
}
let key = StatKey {
cluster_id: 3,
@@ -124,7 +135,7 @@ mod tests {
let val: StatValue = kv.value.clone().try_into().unwrap();
assert_eq!(1, val.stats.len());
assert_eq!(Some(100), val.stats[0].region_num);
assert_eq!(10, val.stats.len());
assert_eq!(Some(1), val.stats[0].region_num);
}
}

View File

@@ -65,6 +65,7 @@ mod tests {
catalog: None,
schema: None,
table: None,
is_infancy: false,
};
let req = HeartbeatRequest {

View File

@@ -178,7 +178,7 @@ pub(crate) fn to_removed_key(key: &str) -> String {
format!("{REMOVED_PREFIX}-{key}")
}
#[derive(Eq, PartialEq, Debug, Clone, Hash)]
#[derive(Eq, PartialEq, Debug, Clone, Hash, Copy)]
pub struct StatKey {
pub cluster_id: u64,
pub node_id: u64,

View File

@@ -17,8 +17,6 @@ pub mod bootstrap;
pub mod cluster;
pub mod election;
pub mod error;
// TODO(LFC): TBC
#[allow(dead_code)]
mod failure_detector;
pub mod handler;
pub mod keys;

View File

@@ -66,6 +66,7 @@ pub struct Context {
pub catalog: Option<String>,
pub schema: Option<String>,
pub table: Option<String>,
pub is_infancy: bool,
}
impl Context {
@@ -199,6 +200,7 @@ impl MetaSrv {
catalog: None,
schema: None,
table: None,
is_infancy: false,
}
}
}

View File

@@ -18,7 +18,7 @@ use std::sync::Arc;
use crate::cluster::MetaPeerClient;
use crate::handler::{
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler,
OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler,
OnLeaderStartHandler, PersistStatsHandler, RegionFailureHandler, ResponseHeaderHandler,
};
use crate::lock::DistLockRef;
use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ};
@@ -118,6 +118,9 @@ impl MetaSrvBuilder {
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
let mut region_failure_handler = RegionFailureHandler::new(election.clone());
region_failure_handler.start().await;
let group = HeartbeatHandlerGroup::default();
let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone());
group.add_handler(ResponseHeaderHandler::default()).await;
@@ -127,7 +130,8 @@ impl MetaSrvBuilder {
group.add_handler(keep_lease_handler).await;
group.add_handler(CheckLeaderHandler::default()).await;
group.add_handler(OnLeaderStartHandler::default()).await;
group.add_handler(CollectStatsHandler::default()).await;
group.add_handler(CollectStatsHandler).await;
group.add_handler(region_failure_handler).await;
group.add_handler(PersistStatsHandler::default()).await;
group
}