mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-19 14:30:43 +00:00
feat: refine failure detector (#7005)
* feat: refine failure detector Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * fix format Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * revert back default value Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * revert change of test Signed-off-by: Ruihang Xia <waynestxia@gmail.com> --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
@@ -399,7 +399,6 @@ mod tests {
|
||||
threshold = 8.0
|
||||
min_std_deviation = "100ms"
|
||||
acceptable_heartbeat_pause = "3000ms"
|
||||
first_heartbeat_estimate = "1000ms"
|
||||
"#;
|
||||
write!(file, "{}", toml_str).unwrap();
|
||||
|
||||
@@ -430,13 +429,6 @@ mod tests {
|
||||
.acceptable_heartbeat_pause
|
||||
.as_millis()
|
||||
);
|
||||
assert_eq!(
|
||||
1000,
|
||||
options
|
||||
.failure_detector
|
||||
.first_heartbeat_estimate
|
||||
.as_millis()
|
||||
);
|
||||
assert_eq!(
|
||||
options.procedure.max_metadata_value_size,
|
||||
Some(ReadableSize::kb(1500))
|
||||
|
||||
@@ -18,6 +18,8 @@ use std::time::Duration;
|
||||
use common_meta::distributed_time_constants;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000;
|
||||
|
||||
/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
|
||||
/// under Apache License 2.0.
|
||||
///
|
||||
@@ -56,10 +58,6 @@ pub(crate) struct PhiAccrualFailureDetector {
|
||||
/// arrivals, due to for example network drop.
|
||||
acceptable_heartbeat_pause_millis: u32,
|
||||
|
||||
/// Bootstrap the stats with heartbeats that corresponds to this duration, with a rather high
|
||||
/// standard deviation (since environment is unknown in the beginning).
|
||||
first_heartbeat_estimate_millis: u32,
|
||||
|
||||
heartbeat_history: HeartbeatHistory,
|
||||
last_heartbeat_millis: Option<i64>,
|
||||
}
|
||||
@@ -72,8 +70,6 @@ pub struct PhiAccrualFailureDetectorOptions {
|
||||
pub min_std_deviation: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub acceptable_heartbeat_pause: Duration,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub first_heartbeat_estimate: Duration,
|
||||
}
|
||||
|
||||
impl Default for PhiAccrualFailureDetectorOptions {
|
||||
@@ -86,7 +82,6 @@ impl Default for PhiAccrualFailureDetectorOptions {
|
||||
acceptable_heartbeat_pause: Duration::from_secs(
|
||||
distributed_time_constants::DATANODE_LEASE_SECS,
|
||||
),
|
||||
first_heartbeat_estimate: Duration::from_millis(1000),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -104,7 +99,6 @@ impl PhiAccrualFailureDetector {
|
||||
min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
|
||||
acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
|
||||
as u32,
|
||||
first_heartbeat_estimate_millis: options.first_heartbeat_estimate.as_millis() as u32,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
}
|
||||
@@ -124,11 +118,11 @@ impl PhiAccrualFailureDetector {
|
||||
// guess statistics for first heartbeat,
|
||||
// important so that connections with only one heartbeat becomes unavailable
|
||||
// bootstrap with 2 entries with rather high standard deviation
|
||||
let std_deviation = self.first_heartbeat_estimate_millis / 4;
|
||||
let std_deviation = FIRST_HEARTBEAT_ESTIMATE_MILLIS / 4;
|
||||
self.heartbeat_history
|
||||
.add((self.first_heartbeat_estimate_millis - std_deviation) as _);
|
||||
.add((FIRST_HEARTBEAT_ESTIMATE_MILLIS - std_deviation) as _);
|
||||
self.heartbeat_history
|
||||
.add((self.first_heartbeat_estimate_millis + std_deviation) as _);
|
||||
.add((FIRST_HEARTBEAT_ESTIMATE_MILLIS + std_deviation) as _);
|
||||
}
|
||||
let _ = self.last_heartbeat_millis.insert(ts_millis);
|
||||
}
|
||||
@@ -367,7 +361,6 @@ mod tests {
|
||||
threshold: 8.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 0,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
@@ -381,14 +374,13 @@ mod tests {
|
||||
threshold: 8.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 0,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
fd.heartbeat(0);
|
||||
assert!((fd.phi(1000)).abs() - 0.3 < 0.2);
|
||||
assert!((fd.phi(2000)).abs() - 4.5 < 0.3);
|
||||
assert!((fd.phi(3000)).abs() > 15.0);
|
||||
assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS)).abs() - 0.3 < 0.2);
|
||||
assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 2)).abs() - 4.5 < 0.3);
|
||||
assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 3)).abs() > 15.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -397,7 +389,6 @@ mod tests {
|
||||
threshold: 8.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 0,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
@@ -413,7 +404,6 @@ mod tests {
|
||||
threshold: 8.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 0,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
@@ -431,7 +421,6 @@ mod tests {
|
||||
threshold: 3.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 0,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
@@ -449,7 +438,6 @@ mod tests {
|
||||
threshold: 8.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 3000,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
@@ -488,7 +476,6 @@ mod tests {
|
||||
threshold: 8.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 3000,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
@@ -507,7 +494,6 @@ mod tests {
|
||||
threshold: 8.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 3000,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(1000),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
@@ -528,7 +514,6 @@ mod tests {
|
||||
threshold: 8.0,
|
||||
min_std_deviation_millis: 100.0,
|
||||
acceptable_heartbeat_pause_millis: 0,
|
||||
first_heartbeat_estimate_millis: 1000,
|
||||
heartbeat_history: HeartbeatHistory::new(3),
|
||||
last_heartbeat_millis: None,
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user