mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-18 14:00:39 +00:00
feat: filter out empty heartbeat req (#2345)
* feat: filter out empty heartbeat request * fix: big mistake
This commit is contained in:
@@ -196,11 +196,9 @@ impl HeartbeatTask {
|
||||
if let Some(message) = message {
|
||||
match outgoing_message_to_mailbox_message(message) {
|
||||
Ok(message) => {
|
||||
let peer = Some(Peer { id: node_id, addr: addr.clone() });
|
||||
let req = HeartbeatRequest {
|
||||
peer: Some(Peer {
|
||||
id: node_id,
|
||||
addr: addr.clone(),
|
||||
}),
|
||||
peer,
|
||||
mailbox_message: Some(message),
|
||||
..Default::default()
|
||||
};
|
||||
@@ -216,19 +214,18 @@ impl HeartbeatTask {
|
||||
}
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
// TODO(jeremy): refactor load_status
|
||||
let (_,region_stats) = Self::load_stats(®ion_server_clone).await;
|
||||
let peer = Some(Peer { id: node_id, addr: addr.clone() });
|
||||
let region_stats = Self::load_region_stats(®ion_server_clone).await;
|
||||
let now = Instant::now();
|
||||
let duration_since_epoch = (now - epoch).as_millis() as u64;
|
||||
let req = HeartbeatRequest {
|
||||
peer: Some(Peer {
|
||||
id: node_id,
|
||||
addr: addr.clone(),
|
||||
}),
|
||||
peer,
|
||||
region_stats,
|
||||
duration_since_epoch: (Instant::now() - epoch).as_millis() as u64,
|
||||
duration_since_epoch,
|
||||
node_epoch,
|
||||
..Default::default()
|
||||
};
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval));
|
||||
sleep.as_mut().reset(now + Duration::from_millis(interval));
|
||||
Some(req)
|
||||
}
|
||||
};
|
||||
@@ -260,9 +257,9 @@ impl HeartbeatTask {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_stats(region_server: &RegionServer) -> (u64, Vec<RegionStat>) {
|
||||
async fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
|
||||
let region_ids = region_server.opened_region_ids();
|
||||
let region_stats = region_ids
|
||||
region_ids
|
||||
.into_iter()
|
||||
.map(|region_id| RegionStat {
|
||||
// TODO(ruihang): scratch more info
|
||||
@@ -270,9 +267,7 @@ impl HeartbeatTask {
|
||||
engine: "MitoEngine".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
(region_stats.len() as _, region_stats)
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
pub async fn close(&self) -> Result<()> {
|
||||
|
||||
@@ -42,7 +42,10 @@ impl HeartbeatHandler for CollectStatsHandler {
|
||||
|
||||
match Stat::try_from(req.clone()) {
|
||||
Ok(stat) => {
|
||||
let _ = acc.stat.insert(stat);
|
||||
// If stat is empty, it means the request is a mailbox response
|
||||
if !stat.is_empty() {
|
||||
let _ = acc.stat.insert(stat);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Incomplete heartbeat data: {:?}, err: {:?}", req, err);
|
||||
|
||||
@@ -52,6 +52,11 @@ pub struct RegionStat {
|
||||
}
|
||||
|
||||
impl Stat {
|
||||
#[inline]
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.region_stats.is_empty()
|
||||
}
|
||||
|
||||
pub fn stat_key(&self) -> StatKey {
|
||||
StatKey {
|
||||
cluster_id: self.cluster_id,
|
||||
|
||||
Reference in New Issue
Block a user