From 0f160a73be873ff30cec731627c06d000f781803 Mon Sep 17 00:00:00 2001 From: LFC Date: Fri, 24 Mar 2023 12:28:34 +0800 Subject: [PATCH] feat: metasrv collects datanode heartbeats for region failure detection (#1214) * feat: metasrv collects datanode heartbeats for region failure detection * chore: change visibility * fix: fragile tests * Update src/meta-srv/src/handler/persist_stats_handler.rs Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> * Update src/meta-srv/src/handler/failure_handler.rs Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> * fix: resolve PR comments * fix: resolve PR comments * fix: resolve PR comments --------- Co-authored-by: shuiyisong Co-authored-by: fys <40801205+Fengys123@users.noreply.github.com> --- src/catalog/src/lib.rs | 42 +-- src/datanode/src/heartbeat.rs | 10 +- src/meta-srv/src/cluster.rs | 2 +- src/meta-srv/src/failure_detector.rs | 105 +++--- src/meta-srv/src/handler.rs | 4 +- .../src/handler/collect_stats_handler.rs | 42 +-- src/meta-srv/src/handler/failure_handler.rs | 151 +++++++++ .../src/handler/failure_handler/runner.rs | 309 ++++++++++++++++++ src/meta-srv/src/handler/on_leader_start.rs | 1 + .../src/handler/persist_stats_handler.rs | 65 ++-- .../src/handler/response_header_handler.rs | 1 + src/meta-srv/src/keys.rs | 2 +- src/meta-srv/src/lib.rs | 2 - src/meta-srv/src/metasrv.rs | 2 + src/meta-srv/src/metasrv/builder.rs | 8 +- 15 files changed, 571 insertions(+), 175 deletions(-) create mode 100644 src/meta-srv/src/handler/failure_handler.rs create mode 100644 src/meta-srv/src/handler/failure_handler/runner.rs diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index a07c671586..84f70044c3 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use api::v1::meta::{RegionStat, TableName}; use common_telemetry::{info, warn}; -use snafu::{OptionExt, ResultExt}; +use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::TableId; use table::requests::CreateTableRequest; @@ -228,34 +228,25 @@ pub(crate) async fn handle_system_table_request<'a, M: CatalogManager>( /// The stat of regions in the datanode node. /// The number of regions can be got from len of vec. -pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> Result<(u64, Vec)> { +/// +/// Ignores any errors occurred during iterating regions. The intention of this method is to +/// collect region stats that will be carried in Datanode's heartbeat to Metasrv, so it's a +/// "try our best" job. +pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> (u64, Vec) { let mut region_number: u64 = 0; let mut region_stats = Vec::new(); - for catalog_name in catalog_manager.catalog_names()? { - let catalog = - catalog_manager - .catalog(&catalog_name)? - .context(error::CatalogNotFoundSnafu { - catalog_name: &catalog_name, - })?; + let Ok(catalog_names) = catalog_manager.catalog_names() else { return (region_number, region_stats) }; + for catalog_name in catalog_names { + let Ok(Some(catalog)) = catalog_manager.catalog(&catalog_name) else { continue }; - for schema_name in catalog.schema_names()? { - let schema = catalog - .schema(&schema_name)? - .context(error::SchemaNotFoundSnafu { - catalog: &catalog_name, - schema: &schema_name, - })?; + let Ok(schema_names) = catalog.schema_names() else { continue }; + for schema_name in schema_names { + let Ok(Some(schema)) = catalog.schema(&schema_name) else { continue }; - for table_name in schema.table_names()? { - let table = - schema - .table(&table_name) - .await? - .context(error::TableNotFoundSnafu { - table_info: &table_name, - })?; + let Ok(table_names) = schema.table_names() else { continue }; + for table_name in table_names { + let Ok(Some(table)) = schema.table(&table_name).await else { continue }; let region_numbers = &table.table_info().meta.region_numbers; region_number += region_numbers.len() as u64; @@ -282,6 +273,5 @@ pub async fn datanode_stat(catalog_manager: &CatalogManagerRef) -> Result<(u64, } } } - - Ok((region_number, region_stats)) + (region_number, region_stats) } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index ddc3ce30c2..9cd0839692 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -106,13 +106,7 @@ impl HeartbeatTask { let mut tx = Self::create_streams(&meta_client, running.clone()).await?; common_runtime::spawn_bg(async move { while running.load(Ordering::Acquire) { - let (region_num, region_stats) = match datanode_stat(&catalog_manager_clone).await { - Ok(datanode_stat) => (datanode_stat.0 as i64, datanode_stat.1), - Err(e) => { - error!("failed to get region status, err: {e:?}"); - (-1, vec![]) - } - }; + let (region_num, region_stats) = datanode_stat(&catalog_manager_clone).await; let req = HeartbeatRequest { peer: Some(Peer { @@ -120,7 +114,7 @@ impl HeartbeatTask { addr: addr.clone(), }), node_stat: Some(NodeStat { - region_num, + region_num: region_num as _, ..Default::default() }), region_stats, diff --git a/src/meta-srv/src/cluster.rs b/src/meta-srv/src/cluster.rs index 297cda860a..1c63ad5cef 100644 --- a/src/meta-srv/src/cluster.rs +++ b/src/meta-srv/src/cluster.rs @@ -261,7 +261,7 @@ mod tests { let stat_val = StatValue { stats: vec![stat] }.try_into().unwrap(); let kv = KeyValue { - key: stat_key.clone().into(), + key: stat_key.into(), value: stat_val, }; diff --git a/src/meta-srv/src/failure_detector.rs b/src/meta-srv/src/failure_detector.rs index 8167d3a5a3..cd56c5e92b 100644 --- a/src/meta-srv/src/failure_detector.rs +++ b/src/meta-srv/src/failure_detector.rs @@ -32,30 +32,27 @@ use std::collections::VecDeque; /// /// where F is the cumulative distribution function of a normal distribution with mean /// and standard deviation estimated from historical heartbeat inter-arrival times. +#[cfg_attr(test, derive(Clone))] pub(crate) struct PhiAccrualFailureDetector { /// A low threshold is prone to generate many wrong suspicions but ensures a quick detection /// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but /// needs more time to detect actual crashes. - threshold: f64, - - /// Number of samples to use for calculation of mean and standard deviation of inter-arrival - /// times. - max_sample_size: u32, + threshold: f32, /// Minimum standard deviation to use for the normal distribution used when calculating phi. /// Too low standard deviation might result in too much sensitivity for sudden, but normal, /// deviations in heartbeat inter arrival times. - min_std_deviation_millis: f64, + min_std_deviation_millis: f32, /// Duration corresponding to number of potentially lost/delayed heartbeats that will be /// accepted before considering it to be an anomaly. /// This margin is important to be able to survive sudden, occasional, pauses in heartbeat /// arrivals, due to for example network drop. - acceptable_heartbeat_pause_millis: i64, + acceptable_heartbeat_pause_millis: u32, /// Bootstrap the stats with heartbeats that corresponds to this duration, with a rather high /// standard deviation (since environment is unknown in the beginning). - first_heartbeat_estimate_millis: i64, + first_heartbeat_estimate_millis: u32, heartbeat_history: HeartbeatHistory, last_heartbeat_millis: Option, @@ -65,14 +62,12 @@ impl Default for PhiAccrualFailureDetector { fn default() -> Self { // default configuration is the same as of Akka: // https://github.com/akka/akka/blob/main/akka-cluster/src/main/resources/reference.conf#L181 - let max_sample_size = 1000; Self { - threshold: 8_f64, - max_sample_size, - min_std_deviation_millis: 100_f64, + threshold: 8_f32, + min_std_deviation_millis: 100_f32, acceptable_heartbeat_pause_millis: 3000, first_heartbeat_estimate_millis: 1000, - heartbeat_history: HeartbeatHistory::new(max_sample_size), + heartbeat_history: HeartbeatHistory::new(1000), last_heartbeat_millis: None, } } @@ -95,28 +90,28 @@ impl PhiAccrualFailureDetector { // bootstrap with 2 entries with rather high standard deviation let std_deviation = self.first_heartbeat_estimate_millis / 4; self.heartbeat_history - .add(self.first_heartbeat_estimate_millis - std_deviation); + .add((self.first_heartbeat_estimate_millis - std_deviation) as _); self.heartbeat_history - .add(self.first_heartbeat_estimate_millis + std_deviation); + .add((self.first_heartbeat_estimate_millis + std_deviation) as _); } let _ = self.last_heartbeat_millis.insert(ts_millis); } pub(crate) fn is_available(&self, ts_millis: i64) -> bool { - self.phi(ts_millis) < self.threshold + self.phi(ts_millis) < self.threshold as _ } /// The suspicion level of the accrual failure detector. /// /// If a connection does not have any records in failure detector then it is considered healthy. - fn phi(&self, ts_millis: i64) -> f64 { + pub(crate) fn phi(&self, ts_millis: i64) -> f64 { if let Some(last_heartbeat_millis) = self.last_heartbeat_millis { let time_diff = ts_millis - last_heartbeat_millis; let mean = self.heartbeat_history.mean(); let std_deviation = self .heartbeat_history .std_deviation() - .max(self.min_std_deviation_millis); + .max(self.min_std_deviation_millis as _); phi( time_diff, @@ -128,6 +123,16 @@ impl PhiAccrualFailureDetector { 0.0 } } + + #[cfg(test)] + pub(crate) fn threshold(&self) -> f32 { + self.threshold + } + + #[cfg(test)] + pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 { + self.acceptable_heartbeat_pause_millis + } } /// Calculation of phi, derived from the Cumulative distribution function for @@ -141,6 +146,8 @@ impl PhiAccrualFailureDetector { /// Usually phi = 1 means likeliness that we will make a mistake is about 10%. /// The likeliness is about 1% with phi = 2, 0.1% with phi = 3 and so on. fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 { + assert_ne!(std_deviation, 0.0); + let time_diff = time_diff as f64; let y = (time_diff - mean) / std_deviation; let e = (-y * (1.5976 + 0.070566 * y * y)).exp(); @@ -155,8 +162,12 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 { /// It is capped by the number of samples specified in `max_sample_size`. /// /// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory. +#[derive(Clone)] struct HeartbeatHistory { + /// Number of samples to use for calculation of mean and standard deviation of inter-arrival + /// times. max_sample_size: u32, + intervals: VecDeque, interval_sum: i64, squared_interval_sum: i64, @@ -198,7 +209,7 @@ impl HeartbeatHistory { let oldest = self .intervals .pop_front() - .expect("intervals must not empty here"); + .expect("intervals must not be empty here"); self.interval_sum -= oldest; self.squared_interval_sum -= oldest * oldest; } @@ -207,42 +218,9 @@ impl HeartbeatHistory { #[cfg(test)] mod tests { use common_time::util::current_time_millis; - use rand::Rng; use super::*; - #[test] - fn test_heartbeat() { - // Generate 2000 heartbeats start from now. Heartbeat interval is one second, plus some - // random millis. - fn generate_heartbeats() -> Vec { - let mut rng = rand::thread_rng(); - let start = current_time_millis(); - (0..2000) - .map(|i| start + i * 1000 + rng.gen_range(0..100)) - .collect::>() - } - let heartbeats = generate_heartbeats(); - - let mut fd = PhiAccrualFailureDetector::default(); - // feed the failure detector with these heartbeats - heartbeats.iter().for_each(|x| fd.heartbeat(*x)); - - let start = *heartbeats.last().unwrap(); - // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ... - for i in 1..=fd.acceptable_heartbeat_pause_millis / 1000 { - let now = start + i * 1000; - assert_eq!(fd.phi(now), 0.0); - } - - // ... then in less than two seconds, phi is above the threshold. - // The same effect can be seen in the diagrams in Akka's document. - let now = start + fd.acceptable_heartbeat_pause_millis + 1000; - assert!(fd.phi(now) < fd.threshold); - let now = start + fd.acceptable_heartbeat_pause_millis + 2000; - assert!(fd.phi(now) > fd.threshold); - } - #[test] fn test_is_available() { let ts_millis = current_time_millis(); @@ -254,12 +232,13 @@ mod tests { fd.heartbeat(ts_millis); + let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64; // is available when heartbeat assert!(fd.is_available(ts_millis)); // is available before heartbeat timeout - assert!(fd.is_available(ts_millis + fd.acceptable_heartbeat_pause_millis / 2)); + assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2)); // is not available after heartbeat timeout - assert!(!fd.is_available(ts_millis + fd.acceptable_heartbeat_pause_millis * 2)); + assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2)); } #[test] @@ -286,14 +265,15 @@ mod tests { fd.heartbeat(ts_millis); + let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64; // phi == 0 when heartbeat assert_eq!(fd.phi(ts_millis), 0.0); // phi < threshold before heartbeat timeout - let now = ts_millis + fd.acceptable_heartbeat_pause_millis / 2; - assert!(fd.phi(now) < fd.threshold); + let now = ts_millis + acceptable_heartbeat_pause_millis / 2; + assert!(fd.phi(now) < fd.threshold as _); // phi >= threshold after heartbeat timeout - let now = ts_millis + fd.acceptable_heartbeat_pause_millis * 2; - assert!(fd.phi(now) >= fd.threshold); + let now = ts_millis + acceptable_heartbeat_pause_millis * 2; + assert!(fd.phi(now) >= fd.threshold as _); } // The following test cases are port from Akka's test: @@ -349,7 +329,6 @@ mod tests { fn test_return_phi_of_0_on_startup_when_no_heartbeats() { let fd = PhiAccrualFailureDetector { threshold: 8.0, - max_sample_size: 1000, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 0, first_heartbeat_estimate_millis: 1000, @@ -364,7 +343,6 @@ mod tests { fn test_return_phi_based_on_guess_when_only_one_heartbeat() { let mut fd = PhiAccrualFailureDetector { threshold: 8.0, - max_sample_size: 1000, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 0, first_heartbeat_estimate_millis: 1000, @@ -381,7 +359,6 @@ mod tests { fn test_return_phi_using_first_interval_after_second_heartbeat() { let mut fd = PhiAccrualFailureDetector { threshold: 8.0, - max_sample_size: 1000, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 0, first_heartbeat_estimate_millis: 1000, @@ -398,7 +375,6 @@ mod tests { fn test_is_available_after_a_series_of_successful_heartbeats() { let mut fd = PhiAccrualFailureDetector { threshold: 8.0, - max_sample_size: 1000, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 0, first_heartbeat_estimate_millis: 1000, @@ -417,7 +393,6 @@ mod tests { fn test_is_not_available_if_heartbeat_are_missed() { let mut fd = PhiAccrualFailureDetector { threshold: 3.0, - max_sample_size: 1000, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 0, first_heartbeat_estimate_millis: 1000, @@ -436,7 +411,6 @@ mod tests { ) { let mut fd = PhiAccrualFailureDetector { threshold: 8.0, - max_sample_size: 1000, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 3000, first_heartbeat_estimate_millis: 1000, @@ -476,7 +450,6 @@ mod tests { fn test_accept_some_configured_missing_heartbeats() { let mut fd = PhiAccrualFailureDetector { threshold: 8.0, - max_sample_size: 1000, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 3000, first_heartbeat_estimate_millis: 1000, @@ -496,7 +469,6 @@ mod tests { fn test_fail_after_configured_acceptable_missing_heartbeats() { let mut fd = PhiAccrualFailureDetector { threshold: 8.0, - max_sample_size: 1000, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 3000, first_heartbeat_estimate_millis: 1000, @@ -518,7 +490,6 @@ mod tests { fn test_use_max_sample_size_heartbeats() { let mut fd = PhiAccrualFailureDetector { threshold: 8.0, - max_sample_size: 3, min_std_deviation_millis: 100.0, acceptable_heartbeat_pause_millis: 0, first_heartbeat_estimate_millis: 1000, diff --git a/src/meta-srv/src/handler.rs b/src/meta-srv/src/handler.rs index 62a3d26483..d878d9103f 100644 --- a/src/meta-srv/src/handler.rs +++ b/src/meta-srv/src/handler.rs @@ -14,6 +14,7 @@ pub use check_leader_handler::CheckLeaderHandler; pub use collect_stats_handler::CollectStatsHandler; +pub use failure_handler::RegionFailureHandler; pub use keep_lease_handler::KeepLeaseHandler; pub use on_leader_start::OnLeaderStartHandler; pub use persist_stats_handler::PersistStatsHandler; @@ -21,6 +22,7 @@ pub use response_header_handler::ResponseHeaderHandler; mod check_leader_handler; mod collect_stats_handler; +mod failure_handler; mod instruction; mod keep_lease_handler; pub mod node_stat; @@ -54,8 +56,8 @@ pub trait HeartbeatHandler: Send + Sync { #[derive(Debug, Default)] pub struct HeartbeatAccumulator { pub header: Option, - pub stats: Vec, pub instructions: Vec, + pub stat: Option, } impl HeartbeatAccumulator { diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index d5c34222d8..a1274fa5b7 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -12,39 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::VecDeque; - use api::v1::meta::HeartbeatRequest; use common_telemetry::debug; -use dashmap::mapref::entry::Entry; -use dashmap::DashMap; use super::node_stat::Stat; use crate::error::Result; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::Context; -type StatKey = (u64, u64); - -pub struct CollectStatsHandler { - max_cached_stats_per_key: usize, - cache: DashMap>, -} - -impl Default for CollectStatsHandler { - fn default() -> Self { - Self::new(10) - } -} - -impl CollectStatsHandler { - pub fn new(max_cached_stats_per_key: usize) -> Self { - Self { - max_cached_stats_per_key, - cache: DashMap::new(), - } - } -} +pub struct CollectStatsHandler; #[async_trait::async_trait] impl HeartbeatHandler for CollectStatsHandler { @@ -60,21 +36,7 @@ impl HeartbeatHandler for CollectStatsHandler { match Stat::try_from(req.clone()) { Ok(stat) => { - let key = (stat.cluster_id, stat.id); - match self.cache.entry(key) { - Entry::Occupied(mut e) => { - let deque = e.get_mut(); - deque.push_front(stat); - if deque.len() >= self.max_cached_stats_per_key { - acc.stats = deque.drain(..).collect(); - } - } - Entry::Vacant(e) => { - let mut stat_vec = VecDeque::with_capacity(self.max_cached_stats_per_key); - stat_vec.push_front(stat); - e.insert(stat_vec); - } - } + let _ = acc.stat.insert(stat); } Err(_) => { debug!("Incomplete heartbeat data: {:?}", req); diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs new file mode 100644 index 0000000000..2593be500b --- /dev/null +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -0,0 +1,151 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod runner; + +use api::v1::meta::HeartbeatRequest; +use async_trait::async_trait; + +use crate::error::Result; +use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner}; +use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; +use crate::metasrv::{Context, ElectionRef}; + +#[derive(Eq, Hash, PartialEq, Clone)] +pub(crate) struct RegionIdent { + catalog: String, + schema: String, + table: String, + region_id: u64, +} + +// TODO(LFC): TBC +pub(crate) struct DatanodeHeartbeat { + #[allow(dead_code)] + cluster_id: u64, + #[allow(dead_code)] + node_id: u64, + region_idents: Vec, + heartbeat_time: i64, +} + +pub struct RegionFailureHandler { + failure_detect_runner: FailureDetectRunner, +} + +impl RegionFailureHandler { + pub fn new(election: Option) -> Self { + Self { + failure_detect_runner: FailureDetectRunner::new(election), + } + } + + pub async fn start(&mut self) { + self.failure_detect_runner.start().await; + } +} + +#[async_trait] +impl HeartbeatHandler for RegionFailureHandler { + async fn handle( + &self, + _: &HeartbeatRequest, + ctx: &mut Context, + acc: &mut HeartbeatAccumulator, + ) -> Result<()> { + if ctx.is_infancy { + self.failure_detect_runner + .send_control(FailureDetectControl::Purge) + .await; + } + + if ctx.is_skip_all() { + return Ok(()); + } + + let Some(stat) = acc.stat.as_ref() else { return Ok(()) }; + + let heartbeat = DatanodeHeartbeat { + cluster_id: stat.cluster_id, + node_id: stat.id, + region_idents: stat + .region_stats + .iter() + .map(|x| RegionIdent { + catalog: x.catalog.clone(), + schema: x.schema.clone(), + table: x.table.clone(), + region_id: x.id, + }) + .collect(), + heartbeat_time: stat.timestamp_millis, + }; + + self.failure_detect_runner.send_heartbeat(heartbeat).await; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::handler::node_stat::{RegionStat, Stat}; + use crate::metasrv::builder::MetaSrvBuilder; + + #[tokio::test(flavor = "multi_thread")] + async fn test_handle_heartbeat() { + let mut handler = RegionFailureHandler::new(None); + handler.start().await; + + let req = &HeartbeatRequest::default(); + + let builder = MetaSrvBuilder::new(); + let metasrv = builder.build().await; + let mut ctx = metasrv.new_ctx(); + ctx.is_infancy = false; + + let acc = &mut HeartbeatAccumulator::default(); + fn new_region_stat(region_id: u64) -> RegionStat { + RegionStat { + id: region_id, + catalog: "a".to_string(), + schema: "b".to_string(), + table: "c".to_string(), + rcus: 0, + wcus: 0, + approximate_bytes: 0, + approximate_rows: 0, + } + } + acc.stat = Some(Stat { + cluster_id: 1, + id: 42, + region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)], + timestamp_millis: 1000, + ..Default::default() + }); + + handler.handle(req, &mut ctx, acc).await.unwrap(); + + let dump = handler.failure_detect_runner.dump().await; + assert_eq!(dump.iter().collect::>().len(), 3); + + // infancy makes heartbeats re-accumulated + ctx.is_infancy = true; + acc.stat = None; + handler.handle(req, &mut ctx, acc).await.unwrap(); + let dump = handler.failure_detect_runner.dump().await; + assert_eq!(dump.iter().collect::>().len(), 0); + } +} diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs new file mode 100644 index 0000000000..d0b517c1d1 --- /dev/null +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -0,0 +1,309 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::DerefMut; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use common_telemetry::error; +use common_time::util::current_time_millis; +use dashmap::mapref::multiple::RefMulti; +use dashmap::DashMap; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::task::JoinHandle; + +use crate::failure_detector::PhiAccrualFailureDetector; +use crate::handler::failure_handler::{DatanodeHeartbeat, RegionIdent}; +use crate::metasrv::ElectionRef; + +pub(crate) enum FailureDetectControl { + Purge, + + #[cfg(test)] + Dump(tokio::sync::oneshot::Sender), +} + +pub(crate) struct FailureDetectRunner { + election: Option, + + heartbeat_tx: Sender, + heartbeat_rx: Option>, + + control_tx: Sender, + control_rx: Option>, + + receiver_handle: Option>, + runner_handle: Option>, +} + +impl FailureDetectRunner { + pub(crate) fn new(election: Option) -> Self { + let (heartbeat_tx, heartbeat_rx) = mpsc::channel::(1024); + let (control_tx, control_rx) = mpsc::channel::(1024); + Self { + election, + heartbeat_tx, + heartbeat_rx: Some(heartbeat_rx), + control_tx, + control_rx: Some(control_rx), + receiver_handle: None, + runner_handle: None, + } + } + + pub(crate) async fn send_heartbeat(&self, heartbeat: DatanodeHeartbeat) { + if let Err(e) = self.heartbeat_tx.send(heartbeat).await { + error!("FailureDetectRunner is stop receiving heartbeats: {}", e) + } + } + + pub(crate) async fn send_control(&self, control: FailureDetectControl) { + if let Err(e) = self.control_tx.send(control).await { + error!("FailureDetectRunner is stop receiving controls: {}", e) + } + } + + pub(crate) async fn start(&mut self) { + let failure_detectors = Arc::new(FailureDetectorContainer(DashMap::new())); + self.start_with(failure_detectors).await + } + + async fn start_with(&mut self, failure_detectors: Arc) { + let Some(mut heartbeat_rx) = self.heartbeat_rx.take() else { return }; + let Some(mut control_rx) = self.control_rx.take() else { return }; + + let container = failure_detectors.clone(); + let receiver_handle = common_runtime::spawn_bg(async move { + loop { + tokio::select! { + Some(control) = control_rx.recv() => { + match control { + FailureDetectControl::Purge => container.clear(), + + #[cfg(test)] + FailureDetectControl::Dump(tx) => { + // Drain any heartbeats that are not handled before dump. + while let Ok(heartbeat) = heartbeat_rx.try_recv() { + for ident in heartbeat.region_idents { + let mut detector = container.get_failure_detector(ident); + detector.heartbeat(heartbeat.heartbeat_time); + } + } + let _ = tx.send(container.dump()); + } + } + } + Some(heartbeat) = heartbeat_rx.recv() => { + for ident in heartbeat.region_idents { + let mut detector = container.get_failure_detector(ident); + detector.heartbeat(heartbeat.heartbeat_time); + } + } + } + } + }); + self.receiver_handle = Some(receiver_handle); + + let election = self.election.clone(); + let runner_handle = common_runtime::spawn_bg(async move { + loop { + let start = Instant::now(); + + let is_leader = election.as_ref().map(|x| x.is_leader()).unwrap_or(true); + if is_leader { + for e in failure_detectors.iter() { + if e.failure_detector().is_available(current_time_millis()) { + // TODO(LFC): TBC + } + } + } + + let elapsed = Instant::now().duration_since(start); + if let Some(sleep) = Duration::from_secs(1).checked_sub(elapsed) { + tokio::time::sleep(sleep).await; + } // else the elapsed time is exceeding one second, we should continue working immediately + } + }); + self.runner_handle = Some(runner_handle); + } + + #[cfg(test)] + fn abort(&mut self) { + let Some(handle) = self.receiver_handle.take() else { return }; + handle.abort(); + + let Some(handle) = self.runner_handle.take() else { return }; + handle.abort(); + } + + #[cfg(test)] + pub(crate) async fn dump(&self) -> FailureDetectorContainer { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.send_control(FailureDetectControl::Dump(tx)).await; + rx.await.unwrap() + } +} + +pub(crate) struct FailureDetectorEntry<'a> { + e: RefMulti<'a, RegionIdent, PhiAccrualFailureDetector>, +} + +impl FailureDetectorEntry<'_> { + fn failure_detector(&self) -> &PhiAccrualFailureDetector { + self.e.value() + } +} + +pub(crate) struct FailureDetectorContainer(DashMap); + +impl FailureDetectorContainer { + fn get_failure_detector( + &self, + ident: RegionIdent, + ) -> impl DerefMut + '_ { + self.0 + .entry(ident) + .or_insert_with(PhiAccrualFailureDetector::default) + } + + pub(crate) fn iter(&self) -> Box + '_> { + Box::new(self.0.iter().map(move |e| FailureDetectorEntry { e })) as _ + } + + fn clear(&self) { + self.0.clear() + } + + #[cfg(test)] + fn dump(&self) -> FailureDetectorContainer { + let mut m = DashMap::with_capacity(self.0.len()); + m.extend(self.0.iter().map(|x| (x.key().clone(), x.value().clone()))); + Self(m) + } +} + +#[cfg(test)] +mod tests { + use rand::Rng; + + use super::*; + + #[test] + fn test_default_failure_detector_container() { + let container = FailureDetectorContainer(DashMap::new()); + let ident = RegionIdent { + catalog: "a".to_string(), + schema: "b".to_string(), + table: "c".to_string(), + region_id: 1, + }; + let _ = container.get_failure_detector(ident.clone()); + assert!(container.0.contains_key(&ident)); + + { + let mut iter = container.iter(); + assert!(iter.next().is_some()); + assert!(iter.next().is_none()); + } + + container.clear(); + assert!(container.0.is_empty()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_control() { + let container = FailureDetectorContainer(DashMap::new()); + + let ident = RegionIdent { + catalog: "a".to_string(), + schema: "b".to_string(), + table: "c".to_string(), + region_id: 1, + }; + container.get_failure_detector(ident.clone()); + + let mut runner = FailureDetectRunner::new(None); + runner.start_with(Arc::new(container)).await; + + let dump = runner.dump().await; + assert_eq!(dump.iter().collect::>().len(), 1); + + runner.send_control(FailureDetectControl::Purge).await; + + let dump = runner.dump().await; + assert_eq!(dump.iter().collect::>().len(), 0); + + runner.abort(); + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_heartbeat() { + let mut runner = FailureDetectRunner::new(None); + runner.start().await; + + // Generate 2000 heartbeats start from now. Heartbeat interval is one second, plus some random millis. + fn generate_heartbeats(node_id: u64, region_ids: Vec) -> Vec { + let mut rng = rand::thread_rng(); + let start = current_time_millis(); + (0..2000) + .map(|i| DatanodeHeartbeat { + cluster_id: 1, + node_id, + region_idents: region_ids + .iter() + .map(|®ion_id| RegionIdent { + catalog: "a".to_string(), + schema: "b".to_string(), + table: "c".to_string(), + region_id, + }) + .collect(), + heartbeat_time: start + i * 1000 + rng.gen_range(0..100), + }) + .collect::>() + } + + let heartbeats = generate_heartbeats(100, vec![1, 2, 3]); + let last_heartbeat_time = heartbeats.last().unwrap().heartbeat_time; + for heartbeat in heartbeats { + runner.send_heartbeat(heartbeat).await; + } + + let dump = runner.dump().await; + let failure_detectors = dump.iter().collect::>(); + assert_eq!(failure_detectors.len(), 3); + + failure_detectors.iter().for_each(|e| { + let fd = e.failure_detector(); + let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64; + let start = last_heartbeat_time; + + // Within the "acceptable_heartbeat_pause_millis" period, phi is zero ... + for i in 1..=acceptable_heartbeat_pause_millis / 1000 { + let now = start + i * 1000; + assert_eq!(fd.phi(now), 0.0); + } + + // ... then in less than two seconds, phi is above the threshold. + // The same effect can be seen in the diagrams in Akka's document. + let now = start + acceptable_heartbeat_pause_millis + 1000; + assert!(fd.phi(now) < fd.threshold() as _); + let now = start + acceptable_heartbeat_pause_millis + 2000; + assert!(fd.phi(now) > fd.threshold() as _); + }); + + runner.abort(); + } +} diff --git a/src/meta-srv/src/handler/on_leader_start.rs b/src/meta-srv/src/handler/on_leader_start.rs index 8e733e9744..163be19a35 100644 --- a/src/meta-srv/src/handler/on_leader_start.rs +++ b/src/meta-srv/src/handler/on_leader_start.rs @@ -31,6 +31,7 @@ impl HeartbeatHandler for OnLeaderStartHandler { ) -> Result<()> { if let Some(election) = &ctx.election { if election.in_infancy() { + ctx.is_infancy = true; ctx.reset_in_memory(); } } diff --git a/src/meta-srv/src/handler/persist_stats_handler.rs b/src/meta-srv/src/handler/persist_stats_handler.rs index 0dff1fac1d..385f8c8fe6 100644 --- a/src/meta-srv/src/handler/persist_stats_handler.rs +++ b/src/meta-srv/src/handler/persist_stats_handler.rs @@ -13,14 +13,20 @@ // limitations under the License. use api::v1::meta::{HeartbeatRequest, PutRequest}; +use dashmap::DashMap; use crate::error::Result; +use crate::handler::node_stat::Stat; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; -use crate::keys::StatValue; +use crate::keys::{StatKey, StatValue}; use crate::metasrv::Context; +const MAX_CACHED_STATS_PER_KEY: usize = 10; + #[derive(Default)] -pub struct PersistStatsHandler; +pub struct PersistStatsHandler { + stats_cache: DashMap>, +} #[async_trait::async_trait] impl HeartbeatHandler for PersistStatsHandler { @@ -30,18 +36,25 @@ impl HeartbeatHandler for PersistStatsHandler { ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result<()> { - if ctx.is_skip_all() || acc.stats.is_empty() { + if ctx.is_skip_all() { return Ok(()); } - let stats = &mut acc.stats; - let key = match stats.get(0) { - Some(stat) => stat.stat_key(), - None => return Ok(()), - }; + let Some(stat) = acc.stat.take() else { return Ok(()) }; - // take stats from &mut acc.stats, avoid clone of vec - let stats = std::mem::take(stats); + let key = stat.stat_key(); + let mut entry = self + .stats_cache + .entry(key) + .or_insert_with(|| Vec::with_capacity(MAX_CACHED_STATS_PER_KEY)); + let stats = entry.value_mut(); + stats.push(stat); + + if stats.len() < MAX_CACHED_STATS_PER_KEY { + return Ok(()); + } + + let stats = stats.drain(..).collect(); let val = StatValue { stats }; @@ -65,7 +78,6 @@ mod tests { use api::v1::meta::RangeRequest; use super::*; - use crate::handler::node_stat::Stat; use crate::keys::StatKey; use crate::service::store::memory::MemStore; @@ -83,24 +95,23 @@ mod tests { catalog: None, schema: None, table: None, + is_infancy: false, }; let req = HeartbeatRequest::default(); - let mut acc = HeartbeatAccumulator { - stats: vec![Stat { - cluster_id: 3, - id: 101, - region_num: Some(100), + let handler = PersistStatsHandler::default(); + for i in 1..=MAX_CACHED_STATS_PER_KEY { + let mut acc = HeartbeatAccumulator { + stat: Some(Stat { + cluster_id: 3, + id: 101, + region_num: Some(i as _), + ..Default::default() + }), ..Default::default() - }], - ..Default::default() - }; - - let stats_handler = PersistStatsHandler; - stats_handler - .handle(&req, &mut ctx, &mut acc) - .await - .unwrap(); + }; + handler.handle(&req, &mut ctx, &mut acc).await.unwrap(); + } let key = StatKey { cluster_id: 3, @@ -124,7 +135,7 @@ mod tests { let val: StatValue = kv.value.clone().try_into().unwrap(); - assert_eq!(1, val.stats.len()); - assert_eq!(Some(100), val.stats[0].region_num); + assert_eq!(10, val.stats.len()); + assert_eq!(Some(1), val.stats[0].region_num); } } diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index dbbec2db59..e040d1e87e 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -65,6 +65,7 @@ mod tests { catalog: None, schema: None, table: None, + is_infancy: false, }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/keys.rs b/src/meta-srv/src/keys.rs index 4b9e031af0..cbebe55c75 100644 --- a/src/meta-srv/src/keys.rs +++ b/src/meta-srv/src/keys.rs @@ -178,7 +178,7 @@ pub(crate) fn to_removed_key(key: &str) -> String { format!("{REMOVED_PREFIX}-{key}") } -#[derive(Eq, PartialEq, Debug, Clone, Hash)] +#[derive(Eq, PartialEq, Debug, Clone, Hash, Copy)] pub struct StatKey { pub cluster_id: u64, pub node_id: u64, diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 78e22a1bc9..af28e19da8 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -17,8 +17,6 @@ pub mod bootstrap; pub mod cluster; pub mod election; pub mod error; -// TODO(LFC): TBC -#[allow(dead_code)] mod failure_detector; pub mod handler; pub mod keys; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 40f288cac0..608f40822f 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -66,6 +66,7 @@ pub struct Context { pub catalog: Option, pub schema: Option, pub table: Option, + pub is_infancy: bool, } impl Context { @@ -199,6 +200,7 @@ impl MetaSrv { catalog: None, schema: None, table: None, + is_infancy: false, } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index bf319f098a..c39254df48 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use crate::cluster::MetaPeerClient; use crate::handler::{ CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, KeepLeaseHandler, - OnLeaderStartHandler, PersistStatsHandler, ResponseHeaderHandler, + OnLeaderStartHandler, PersistStatsHandler, RegionFailureHandler, ResponseHeaderHandler, }; use crate::lock::DistLockRef; use crate::metasrv::{ElectionRef, MetaSrv, MetaSrvOptions, SelectorRef, TABLE_ID_SEQ}; @@ -118,6 +118,9 @@ impl MetaSrvBuilder { let handler_group = match handler_group { Some(handler_group) => handler_group, None => { + let mut region_failure_handler = RegionFailureHandler::new(election.clone()); + region_failure_handler.start().await; + let group = HeartbeatHandlerGroup::default(); let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone()); group.add_handler(ResponseHeaderHandler::default()).await; @@ -127,7 +130,8 @@ impl MetaSrvBuilder { group.add_handler(keep_lease_handler).await; group.add_handler(CheckLeaderHandler::default()).await; group.add_handler(OnLeaderStartHandler::default()).await; - group.add_handler(CollectStatsHandler::default()).await; + group.add_handler(CollectStatsHandler).await; + group.add_handler(region_failure_handler).await; group.add_handler(PersistStatsHandler::default()).await; group }