feat: make PhiAccrualFailureDetector configurable (#2709)

* feat: make PhiAccrualFailureDetector configurable

Signed-off-by: tison <wander4096@gmail.com>

* propagate

Signed-off-by: tison <wander4096@gmail.com>

* f32 does not implement Eq

Signed-off-by: tison <wander4096@gmail.com>

* add to test

Signed-off-by: tison <wander4096@gmail.com>

* fmt clippy

Signed-off-by: tison <wander4096@gmail.com>

* Update src/meta-srv/src/failure_detector.rs

* address comments

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
This commit is contained in:
tison
2023-11-09 14:26:33 +08:00
committed by GitHub
parent c62ba79759
commit b9a7c2db7e
7 changed files with 122 additions and 27 deletions

View File

@@ -28,6 +28,13 @@ max_retry_times = 12
# Initial retry delay of procedures, increases exponentially
retry_delay = "500ms"
# Failure detectors options.
[failure_detector]
threshold = 8.0
min_std_deviation_millis = 100.0
acceptable_heartbeat_pause_millis = 3000
first_heartbeat_estimate_millis = 1000
# # Datanode options.
# [datanode]
# # Datanode client options.

View File

@@ -216,6 +216,12 @@ mod tests {
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
[failure_detector]
threshold = 8.0
min_std_deviation_millis = 100.0
acceptable_heartbeat_pause_millis = 3000
first_heartbeat_estimate_millis = 1000
"#;
write!(file, "{}", toml_str).unwrap();
@@ -234,6 +240,16 @@ mod tests {
assert_eq!(SelectorType::LeaseBased, options.selector);
assert_eq!("debug", options.logging.level.as_ref().unwrap());
assert_eq!("/tmp/greptimedb/test/logs".to_string(), options.logging.dir);
assert_eq!(8.0, options.failure_detector.threshold);
assert_eq!(100.0, options.failure_detector.min_std_deviation_millis);
assert_eq!(
3000,
options.failure_detector.acceptable_heartbeat_pause_millis
);
assert_eq!(
1000,
options.failure_detector.first_heartbeat_estimate_millis
);
}
#[test]

View File

@@ -14,6 +14,8 @@
use std::collections::VecDeque;
use serde::{Deserialize, Serialize};
/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/main/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
/// You can find it's document here:
/// <https://doc.akka.io/docs/akka/current/typed/failure-detector.html>
@@ -58,7 +60,16 @@ pub(crate) struct PhiAccrualFailureDetector {
last_heartbeat_millis: Option<i64>,
}
impl Default for PhiAccrualFailureDetector {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct PhiAccrualFailureDetectorOptions {
pub threshold: f32,
pub min_std_deviation_millis: f32,
pub acceptable_heartbeat_pause_millis: u32,
pub first_heartbeat_estimate_millis: u32,
}
impl Default for PhiAccrualFailureDetectorOptions {
fn default() -> Self {
// default configuration is the same as of Akka:
// https://github.com/akka/akka/blob/main/akka-cluster/src/main/resources/reference.conf#L181
@@ -67,13 +78,28 @@ impl Default for PhiAccrualFailureDetector {
min_std_deviation_millis: 100_f32,
acceptable_heartbeat_pause_millis: 3000,
first_heartbeat_estimate_millis: 1000,
heartbeat_history: HeartbeatHistory::new(1000),
last_heartbeat_millis: None,
}
}
}
impl Default for PhiAccrualFailureDetector {
fn default() -> Self {
Self::from_options(Default::default())
}
}
impl PhiAccrualFailureDetector {
pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
Self {
threshold: options.threshold,
min_std_deviation_millis: options.min_std_deviation_millis,
acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause_millis,
first_heartbeat_estimate_millis: options.first_heartbeat_estimate_millis,
heartbeat_history: HeartbeatHistory::new(1000),
last_heartbeat_millis: None,
}
}
pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
if ts_millis < last_heartbeat_millis {

View File

@@ -23,6 +23,7 @@ use common_meta::RegionIdent;
use store_api::storage::RegionId;
use crate::error::Result;
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::failure_handler::runner::{FailureDetectControl, FailureDetectRunner};
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::{Context, ElectionRef};
@@ -41,11 +42,15 @@ impl RegionFailureHandler {
pub(crate) async fn try_new(
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
) -> Result<Self> {
region_failover_manager.try_start()?;
let mut failure_detect_runner =
FailureDetectRunner::new(election, region_failover_manager.clone());
let mut failure_detect_runner = FailureDetectRunner::new(
election,
region_failover_manager.clone(),
failure_detector_options,
);
failure_detect_runner.start().await;
Ok(Self {
@@ -112,9 +117,11 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_heartbeat() {
let region_failover_manager = create_region_failover_manager();
let handler = RegionFailureHandler::try_new(None, region_failover_manager)
.await
.unwrap();
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let handler =
RegionFailureHandler::try_new(None, region_failover_manager, failure_detector_options)
.await
.unwrap();
let req = &HeartbeatRequest::default();

View File

@@ -25,7 +25,7 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task::JoinHandle;
use crate::failure_detector::PhiAccrualFailureDetector;
use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
use crate::handler::failure_handler::DatanodeHeartbeat;
use crate::metasrv::ElectionRef;
use crate::procedure::region_failover::RegionFailoverManager;
@@ -40,6 +40,7 @@ pub(crate) enum FailureDetectControl {
pub(crate) struct FailureDetectRunner {
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
heartbeat_tx: Sender<DatanodeHeartbeat>,
heartbeat_rx: Option<Receiver<DatanodeHeartbeat>>,
@@ -55,12 +56,14 @@ impl FailureDetectRunner {
pub(super) fn new(
election: Option<ElectionRef>,
region_failover_manager: Arc<RegionFailoverManager>,
failure_detector_options: PhiAccrualFailureDetectorOptions,
) -> Self {
let (heartbeat_tx, heartbeat_rx) = mpsc::channel::<DatanodeHeartbeat>(1024);
let (control_tx, control_rx) = mpsc::channel::<FailureDetectControl>(1024);
Self {
election,
region_failover_manager,
failure_detector_options,
heartbeat_tx,
heartbeat_rx: Some(heartbeat_rx),
control_tx,
@@ -83,7 +86,10 @@ impl FailureDetectRunner {
}
pub(crate) async fn start(&mut self) {
let failure_detectors = Arc::new(FailureDetectorContainer(DashMap::new()));
let failure_detectors = Arc::new(FailureDetectorContainer {
detectors: DashMap::new(),
options: self.failure_detector_options.clone(),
});
self.start_with(failure_detectors).await
}
@@ -215,33 +221,49 @@ impl FailureDetectorEntry<'_> {
}
}
pub(crate) struct FailureDetectorContainer(DashMap<RegionIdent, PhiAccrualFailureDetector>);
pub(crate) struct FailureDetectorContainer {
options: PhiAccrualFailureDetectorOptions,
detectors: DashMap<RegionIdent, PhiAccrualFailureDetector>,
}
impl FailureDetectorContainer {
fn get_failure_detector(
&self,
ident: RegionIdent,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.0.entry(ident).or_default()
self.detectors
.entry(ident)
.or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options.clone()))
}
pub(crate) fn iter(&self) -> Box<dyn Iterator<Item = FailureDetectorEntry> + '_> {
Box::new(self.0.iter().map(move |e| FailureDetectorEntry { e })) as _
Box::new(
self.detectors
.iter()
.map(move |e| FailureDetectorEntry { e }),
) as _
}
fn remove(&self, ident: &RegionIdent) {
let _ = self.0.remove(ident);
let _ = self.detectors.remove(ident);
}
fn clear(&self) {
self.0.clear()
self.detectors.clear()
}
#[cfg(test)]
fn dump(&self) -> FailureDetectorContainer {
let mut m = DashMap::with_capacity(self.0.len());
m.extend(self.0.iter().map(|x| (x.key().clone(), x.value().clone())));
Self(m)
let mut m = DashMap::with_capacity(self.detectors.len());
m.extend(
self.detectors
.iter()
.map(|x| (x.key().clone(), x.value().clone())),
);
Self {
detectors: m,
options: self.options.clone(),
}
}
}
@@ -254,7 +276,10 @@ mod tests {
#[test]
fn test_default_failure_detector_container() {
let container = FailureDetectorContainer(DashMap::new());
let container = FailureDetectorContainer {
detectors: DashMap::new(),
options: PhiAccrualFailureDetectorOptions::default(),
};
let ident = RegionIdent {
table_id: 1,
cluster_id: 3,
@@ -263,7 +288,7 @@ mod tests {
engine: "mito2".to_string(),
};
let _ = container.get_failure_detector(ident.clone());
assert!(container.0.contains_key(&ident));
assert!(container.detectors.contains_key(&ident));
{
let mut iter = container.iter();
@@ -272,12 +297,15 @@ mod tests {
}
container.clear();
assert!(container.0.is_empty());
assert!(container.detectors.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_control() {
let container = FailureDetectorContainer(DashMap::new());
let container = FailureDetectorContainer {
detectors: DashMap::new(),
options: PhiAccrualFailureDetectorOptions::default(),
};
let ident = RegionIdent {
table_id: 1,
@@ -289,7 +317,9 @@ mod tests {
let _ = container.get_failure_detector(ident.clone());
let region_failover_manager = create_region_failover_manager();
let mut runner = FailureDetectRunner::new(None, region_failover_manager);
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let mut runner =
FailureDetectRunner::new(None, region_failover_manager, failure_detector_options);
runner.start_with(Arc::new(container)).await;
let dump = runner.dump().await;
@@ -304,7 +334,9 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_heartbeat() {
let region_failover_manager = create_region_failover_manager();
let mut runner = FailureDetectRunner::new(None, region_failover_manager);
let failure_detector_options = PhiAccrualFailureDetectorOptions::default();
let mut runner =
FailureDetectRunner::new(None, region_failover_manager, failure_detector_options);
runner.start().await;
// Generate 2000 heartbeats start from now. Heartbeat interval is one second, plus some random millis.

View File

@@ -42,6 +42,7 @@ use crate::error::{
self, InitMetadataSnafu, Result, StartProcedureManagerSnafu, StartTelemetryTaskSnafu,
StopProcedureManagerSnafu,
};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::HeartbeatHandlerGroup;
use crate::lock::DistLockRef;
use crate::pubsub::{PublishRef, SubscribeManagerRef};
@@ -53,7 +54,7 @@ use crate::state::{become_follower, become_leader, StateRef};
pub const TABLE_ID_SEQ: &str = "table_id";
pub const METASRV_HOME: &str = "/tmp/metasrv";
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct MetaSrvOptions {
pub bind_addr: String,
@@ -65,6 +66,7 @@ pub struct MetaSrvOptions {
pub http: HttpOptions,
pub logging: LoggingOptions,
pub procedure: ProcedureConfig,
pub failure_detector: PhiAccrualFailureDetectorOptions,
pub datanode: DatanodeOptions,
pub enable_telemetry: bool,
pub data_home: String,
@@ -88,6 +90,7 @@ impl Default for MetaSrvOptions {
max_retry_times: 12,
retry_delay: Duration::from_millis(500),
},
failure_detector: PhiAccrualFailureDetectorOptions::default(),
datanode: DatanodeOptions::default(),
enable_telemetry: true,
data_home: METASRV_HOME.to_string(),

View File

@@ -211,8 +211,12 @@ impl MetaSrvBuilder {
table_metadata_manager.clone(),
));
Some(
RegionFailureHandler::try_new(election.clone(), region_failover_manager)
.await?,
RegionFailureHandler::try_new(
election.clone(),
region_failover_manager,
options.failure_detector.clone(),
)
.await?,
)
} else {
None