chore: adding heartbeat sent/recv counts in greptimedb nodes (#4624)

obs: adding heartbeat sent/recv counts in greptimedb nodes
This commit is contained in:
Lanqing Yang
2024-08-29 20:57:16 -07:00
committed by GitHub
parent 8ea4f67e4b
commit 9286e963e7
6 changed files with 48 additions and 2 deletions

View File

@@ -37,7 +37,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::metrics;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;
pub(crate) mod handler;
@@ -231,10 +231,12 @@ impl HeartbeatTask {
mailbox_message: Some(message),
..Default::default()
};
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
Some(req)
}
Err(e) => {
error!(e; "Failed to encode mailbox messages!");
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
None
}
}
@@ -304,6 +306,8 @@ impl HeartbeatTask {
error!(e; "Failed to reconnect to metasrv!");
}
}
} else {
HEARTBEAT_SENT_COUNT.inc();
}
}
}

View File

@@ -54,4 +54,17 @@ lazy_static! {
&[REGION_ROLE]
)
.unwrap();
/// The number of heartbeats send by datanode.
pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!(
"greptime_datanode_heartbeat_send_count",
"datanode heartbeat sent",
)
.unwrap();
/// The number of heartbeats received by datanode, labeled with result type.
pub static ref HEARTBEAT_RECV_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_datanode_heartbeat_recv_count",
"datanode heartbeat received",
&["result"]
)
.unwrap();
}

View File

@@ -32,6 +32,7 @@ use tokio::time::{Duration, Instant};
use crate::error;
use crate::error::Result;
use crate::frontend::FrontendOptions;
use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub mod handler;
@@ -94,10 +95,16 @@ impl HeartbeatTask {
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
if let Err(e) = capture_self.handle_response(ctx).await {
error!(e; "Error while handling heartbeat response");
HEARTBEAT_RECV_COUNT
.with_label_values(&["processing_error"])
.inc();
} else {
HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
}
}
Ok(None) => break,
Err(e) => {
HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
error!(e; "Occur error while reading heartbeat response");
capture_self
.start_with_retry(Duration::from_millis(retry_interval))
@@ -180,6 +187,7 @@ impl HeartbeatTask {
error!(e; "Failed to send heartbeat to metasrv");
break;
} else {
HEARTBEAT_SENT_COUNT.inc();
debug!("Send a heartbeat request to metasrv, content: {:?}", req);
}
}

View File

@@ -51,4 +51,17 @@ lazy_static! {
"frontend otlp traces rows"
)
.unwrap();
/// The number of heartbeats send by frontend node.
pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!(
"greptime_frontend_heartbeat_send_count",
"frontend heartbeat sent",
)
.unwrap();
/// The number of heartbeats received by frontend node, labeled with result type.
pub static ref HEARTBEAT_RECV_COUNT: IntCounterVec = register_int_counter_vec!(
"greptime_frontend_heartbeat_recv_count",
"frontend heartbeat received",
&["result"]
)
.unwrap();
}

View File

@@ -45,4 +45,7 @@ lazy_static! {
/// Meta kv cache miss counter.
pub static ref METRIC_META_KV_CACHE_MISS: IntCounterVec =
register_int_counter_vec!("greptime_meta_kv_cache_miss", "meta kv cache miss", &["op"]).unwrap();
// Heartbeat received by metasrv.
pub static ref METRIC_META_HEARTBEAT_RECV: IntCounterVec =
register_int_counter_vec!("greptime_meta_heartbeat_recv", "heartbeats received by metasrv", &["pusher_key"]).unwrap();
}

View File

@@ -32,6 +32,7 @@ use crate::error;
use crate::error::Result;
use crate::handler::{HeartbeatHandlerGroup, Pusher};
use crate::metasrv::{Context, Metasrv};
use crate::metrics::METRIC_META_HEARTBEAT_RECV;
use crate::service::{GrpcResult, GrpcStream};
#[async_trait::async_trait]
@@ -65,7 +66,11 @@ impl heartbeat_server::Heartbeat for Metasrv {
if pusher_key.is_none() {
pusher_key = register_pusher(&handler_group, header, tx.clone()).await;
}
if let Some(k) = &pusher_key {
METRIC_META_HEARTBEAT_RECV.with_label_values(&[k]);
} else {
METRIC_META_HEARTBEAT_RECV.with_label_values(&["none"]);
}
let res = handler_group
.handle(req, ctx.clone())
.await