From b9a7c2db7e1812eeec77d7c22c1a3e5c17287e7a Mon Sep 17 00:00:00 2001 From: tison Date: Thu, 9 Nov 2023 14:26:33 +0800 Subject: [PATCH] feat: make PhiAccrualFailureDetector configurable (#2709) * feat: make PhiAccrualFailureDetector configurable Signed-off-by: tison * propagate Signed-off-by: tison * f32 does not implement Eq Signed-off-by: tison * add to test Signed-off-by: tison * fmt clippy Signed-off-by: tison * Update src/meta-srv/src/failure_detector.rs * address comments Signed-off-by: tison --------- Signed-off-by: tison --- config/metasrv.example.toml | 7 ++ src/cmd/src/metasrv.rs | 16 +++++ src/meta-srv/src/failure_detector.rs | 32 +++++++++- src/meta-srv/src/handler/failure_handler.rs | 17 +++-- .../src/handler/failure_handler/runner.rs | 64 ++++++++++++++----- src/meta-srv/src/metasrv.rs | 5 +- src/meta-srv/src/metasrv/builder.rs | 8 ++- 7 files changed, 122 insertions(+), 27 deletions(-) diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 6832573b05..99cec6a85c 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -28,6 +28,13 @@ max_retry_times = 12 # Initial retry delay of procedures, increases exponentially retry_delay = "500ms" +# Failure detectors options. +[failure_detector] +threshold = 8.0 +min_std_deviation_millis = 100.0 +acceptable_heartbeat_pause_millis = 3000 +first_heartbeat_estimate_millis = 1000 + # # Datanode options. # [datanode] # # Datanode client options. diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index c0d56828e4..1cde688682 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -216,6 +216,12 @@ mod tests { [logging] level = "debug" dir = "/tmp/greptimedb/test/logs" + + [failure_detector] + threshold = 8.0 + min_std_deviation_millis = 100.0 + acceptable_heartbeat_pause_millis = 3000 + first_heartbeat_estimate_millis = 1000 "#; write!(file, "{}", toml_str).unwrap(); @@ -234,6 +240,16 @@ mod tests { assert_eq!(SelectorType::LeaseBased, options.selector); assert_eq!("debug", options.logging.level.as_ref().unwrap()); assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir); + assert_eq!(8.0, options.failure_detector.threshold); + assert_eq!(100.0, options.failure_detector.min_std_deviation_millis); + assert_eq!( + 3000, + options.failure_detector.acceptable_heartbeat_pause_millis + ); + assert_eq!( + 1000, + options.failure_detector.first_heartbeat_estimate_millis + ); } #[test] diff --git a/src/meta-srv/src/failure_detector.rs b/src/meta-srv/src/failure_detector.rs index 86463a0cf5..3e93adb73a 100644 --- a/src/meta-srv/src/failure_detector.rs +++ b/src/meta-srv/src/failure_detector.rs @@ -14,6 +14,8 @@ use std::collections::VecDeque; +use serde::{Deserialize, Serialize}; + /// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/main/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)" /// You can find it's document here: /// @@ -58,7 +60,16 @@ pub(crate) struct PhiAccrualFailureDetector { last_heartbeat_millis: Option, } -impl Default for PhiAccrualFailureDetector { +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(default)] +pub struct PhiAccrualFailureDetectorOptions { + pub threshold: f32, + pub min_std_deviation_millis: f32, + pub acceptable_heartbeat_pause_millis: u32, + pub first_heartbeat_estimate_millis: u32, +} + +impl Default for PhiAccrualFailureDetectorOptions { fn default() -> Self { // default configuration is the same as of Akka: // https://github.com/akka/akka/blob/main/akka-cluster/src/main/resources/reference.conf#L181 @@ -67,13 +78,28 @@ impl Default for PhiAccrualFailureDetector { min_std_deviation_millis: 100_f32, acceptable_heartbeat_pause_millis: 3000, first_heartbeat_estimate_millis: 1000, - heartbeat_history: HeartbeatHistory::new(1000), - last_heartbeat_millis: None, } } } +impl Default for PhiAccrualFailureDetector { + fn default() -> Self { + Self::from_options(Default::default()) + } +} + impl PhiAccrualFailureDetector { + pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self { + Self { + threshold: options.threshold, + min_std_deviation_millis: options.min_std_deviation_millis, + acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause_millis, + first_heartbeat_estimate_millis: options.first_heartbeat_estimate_millis, + heartbeat_history: HeartbeatHistory::new(1000), + last_heartbeat_millis: None, + } + } + pub(crate) fn heartbeat(&mut self, ts_millis: i64) { if let Some(last_heartbeat_millis) = self.last_heartbeat_millis { if ts_millis < last_heartbeat_millis { diff --git a/src/meta-srv/src/handler/failure_handler.rs b/src/meta-srv/src/handler/failure_handler.rs index d1c39d99e7..7170128961 100644 --- a/src/meta-srv/src/handler/failure_handler.rs +++ b/src/meta-srv/src/handler/failure_handler.rs @@ -23,6 +23,7 @@ use common_meta::RegionIdent; use store_api::storage::RegionId; use crate::error::Result; +use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner}; use crate::handler::{HeartbeatAccumulator, HeartbeatHandler}; use crate::metasrv::{Context, ElectionRef}; @@ -41,11 +42,15 @@ impl RegionFailureHandler { pub(crate) async fn try_new( election: Option, region_failover_manager: Arc, + failure_detector_options: PhiAccrualFailureDetectorOptions, ) -> Result { region_failover_manager.try_start()?; - let mut failure_detect_runner = - FailureDetectRunner::new(election, region_failover_manager.clone()); + let mut failure_detect_runner = FailureDetectRunner::new( + election, + region_failover_manager.clone(), + failure_detector_options, + ); failure_detect_runner.start().await; Ok(Self { @@ -112,9 +117,11 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_handle_heartbeat() { let region_failover_manager = create_region_failover_manager(); - let handler = RegionFailureHandler::try_new(None, region_failover_manager) - .await - .unwrap(); + let failure_detector_options = PhiAccrualFailureDetectorOptions::default(); + let handler = + RegionFailureHandler::try_new(None, region_failover_manager, failure_detector_options) + .await + .unwrap(); let req = &HeartbeatRequest::default(); diff --git a/src/meta-srv/src/handler/failure_handler/runner.rs b/src/meta-srv/src/handler/failure_handler/runner.rs index 6f9ca2ba9f..4d9ad36764 100644 --- a/src/meta-srv/src/handler/failure_handler/runner.rs +++ b/src/meta-srv/src/handler/failure_handler/runner.rs @@ -25,7 +25,7 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task::JoinHandle; -use crate::failure_detector::PhiAccrualFailureDetector; +use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions}; use crate::handler::failure_handler::DatanodeHeartbeat; use crate::metasrv::ElectionRef; use crate::procedure::region_failover::RegionFailoverManager; @@ -40,6 +40,7 @@ pub(crate) enum FailureDetectControl { pub(crate) struct FailureDetectRunner { election: Option, region_failover_manager: Arc, + failure_detector_options: PhiAccrualFailureDetectorOptions, heartbeat_tx: Sender, heartbeat_rx: Option>, @@ -55,12 +56,14 @@ impl FailureDetectRunner { pub(super) fn new( election: Option, region_failover_manager: Arc, + failure_detector_options: PhiAccrualFailureDetectorOptions, ) -> Self { let (heartbeat_tx, heartbeat_rx) = mpsc::channel::(1024); let (control_tx, control_rx) = mpsc::channel::(1024); Self { election, region_failover_manager, + failure_detector_options, heartbeat_tx, heartbeat_rx: Some(heartbeat_rx), control_tx, @@ -83,7 +86,10 @@ impl FailureDetectRunner { } pub(crate) async fn start(&mut self) { - let failure_detectors = Arc::new(FailureDetectorContainer(DashMap::new())); + let failure_detectors = Arc::new(FailureDetectorContainer { + detectors: DashMap::new(), + options: self.failure_detector_options.clone(), + }); self.start_with(failure_detectors).await } @@ -215,33 +221,49 @@ impl FailureDetectorEntry<'_> { } } -pub(crate) struct FailureDetectorContainer(DashMap); +pub(crate) struct FailureDetectorContainer { + options: PhiAccrualFailureDetectorOptions, + detectors: DashMap, +} impl FailureDetectorContainer { fn get_failure_detector( &self, ident: RegionIdent, ) -> impl DerefMut + '_ { - self.0.entry(ident).or_default() + self.detectors + .entry(ident) + .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options.clone())) } pub(crate) fn iter(&self) -> Box + '_> { - Box::new(self.0.iter().map(move |e| FailureDetectorEntry { e })) as _ + Box::new( + self.detectors + .iter() + .map(move |e| FailureDetectorEntry { e }), + ) as _ } fn remove(&self, ident: &RegionIdent) { - let _ = self.0.remove(ident); + let _ = self.detectors.remove(ident); } fn clear(&self) { - self.0.clear() + self.detectors.clear() } #[cfg(test)] fn dump(&self) -> FailureDetectorContainer { - let mut m = DashMap::with_capacity(self.0.len()); - m.extend(self.0.iter().map(|x| (x.key().clone(), x.value().clone()))); - Self(m) + let mut m = DashMap::with_capacity(self.detectors.len()); + m.extend( + self.detectors + .iter() + .map(|x| (x.key().clone(), x.value().clone())), + ); + Self { + detectors: m, + options: self.options.clone(), + } } } @@ -254,7 +276,10 @@ mod tests { #[test] fn test_default_failure_detector_container() { - let container = FailureDetectorContainer(DashMap::new()); + let container = FailureDetectorContainer { + detectors: DashMap::new(), + options: PhiAccrualFailureDetectorOptions::default(), + }; let ident = RegionIdent { table_id: 1, cluster_id: 3, @@ -263,7 +288,7 @@ mod tests { engine: "mito2".to_string(), }; let _ = container.get_failure_detector(ident.clone()); - assert!(container.0.contains_key(&ident)); + assert!(container.detectors.contains_key(&ident)); { let mut iter = container.iter(); @@ -272,12 +297,15 @@ mod tests { } container.clear(); - assert!(container.0.is_empty()); + assert!(container.detectors.is_empty()); } #[tokio::test(flavor = "multi_thread")] async fn test_control() { - let container = FailureDetectorContainer(DashMap::new()); + let container = FailureDetectorContainer { + detectors: DashMap::new(), + options: PhiAccrualFailureDetectorOptions::default(), + }; let ident = RegionIdent { table_id: 1, @@ -289,7 +317,9 @@ mod tests { let _ = container.get_failure_detector(ident.clone()); let region_failover_manager = create_region_failover_manager(); - let mut runner = FailureDetectRunner::new(None, region_failover_manager); + let failure_detector_options = PhiAccrualFailureDetectorOptions::default(); + let mut runner = + FailureDetectRunner::new(None, region_failover_manager, failure_detector_options); runner.start_with(Arc::new(container)).await; let dump = runner.dump().await; @@ -304,7 +334,9 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_heartbeat() { let region_failover_manager = create_region_failover_manager(); - let mut runner = FailureDetectRunner::new(None, region_failover_manager); + let failure_detector_options = PhiAccrualFailureDetectorOptions::default(); + let mut runner = + FailureDetectRunner::new(None, region_failover_manager, failure_detector_options); runner.start().await; // Generate 2000 heartbeats start from now. Heartbeat interval is one second, plus some random millis. diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index a810a55019..c7fb8c7529 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -42,6 +42,7 @@ use crate::error::{ self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu, StopProcedureManagerSnafu, }; +use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::handler::HeartbeatHandlerGroup; use crate::lock::DistLockRef; use crate::pubsub::{PublishRef, SubscribeManagerRef}; @@ -53,7 +54,7 @@ use crate::state::{become_follower, become_leader, StateRef}; pub const TABLE_ID_SEQ: &str = "table_id"; pub const METASRV_HOME: &str = "/tmp/metasrv"; -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(default)] pub struct MetaSrvOptions { pub bind_addr: String, @@ -65,6 +66,7 @@ pub struct MetaSrvOptions { pub http: HttpOptions, pub logging: LoggingOptions, pub procedure: ProcedureConfig, + pub failure_detector: PhiAccrualFailureDetectorOptions, pub datanode: DatanodeOptions, pub enable_telemetry: bool, pub data_home: String, @@ -88,6 +90,7 @@ impl Default for MetaSrvOptions { max_retry_times: 12, retry_delay: Duration::from_millis(500), }, + failure_detector: PhiAccrualFailureDetectorOptions::default(), datanode: DatanodeOptions::default(), enable_telemetry: true, data_home: METASRV_HOME.to_string(), diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 90d3404662..8ad55b7999 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -211,8 +211,12 @@ impl MetaSrvBuilder { table_metadata_manager.clone(), )); Some( - RegionFailureHandler::try_new(election.clone(), region_failover_manager) - .await?, + RegionFailureHandler::try_new( + election.clone(), + region_failover_manager, + options.failure_detector.clone(), + ) + .await?, ) } else { None