mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-14 12:00:40 +00:00
feat: add build info for flow heartbeat task (#4228)
* chore: refactor load region stats * feat: add build info for flow heartbeat
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3966,6 +3966,7 @@ dependencies = [
|
||||
"common-runtime",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"common-version",
|
||||
"datafusion 38.0.0",
|
||||
"datafusion-common 38.0.0",
|
||||
"datafusion-expr 38.0.0",
|
||||
|
||||
@@ -136,7 +136,6 @@ impl Datanode {
|
||||
if let Some(heartbeat_task) = &self.heartbeat_task {
|
||||
heartbeat_task
|
||||
.close()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ShutdownInstanceSnafu)?;
|
||||
}
|
||||
|
||||
@@ -158,7 +158,7 @@ impl HeartbeatTask {
|
||||
ctx: HeartbeatResponseHandlerContext,
|
||||
handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
) -> Result<()> {
|
||||
trace!("heartbeat response: {:?}", ctx.response);
|
||||
trace!("Heartbeat response: {:?}", ctx.response);
|
||||
handler_executor
|
||||
.handle(ctx)
|
||||
.await
|
||||
@@ -245,7 +245,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
let build_info = common_version::build_info();
|
||||
let region_stats = Self::load_region_stats(®ion_server_clone).await;
|
||||
let region_stats = Self::load_region_stats(®ion_server_clone);
|
||||
let now = Instant::now();
|
||||
let duration_since_epoch = (now - epoch).as_millis() as u64;
|
||||
let req = HeartbeatRequest {
|
||||
@@ -313,30 +313,23 @@ impl HeartbeatTask {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
|
||||
let regions = region_server.reportable_regions();
|
||||
|
||||
let mut region_stats = Vec::new();
|
||||
for stat in regions {
|
||||
let approximate_bytes = region_server
|
||||
.region_disk_usage(stat.region_id)
|
||||
.await
|
||||
.unwrap_or(0);
|
||||
let region_stat = RegionStat {
|
||||
fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
|
||||
region_server
|
||||
.reportable_regions()
|
||||
.into_iter()
|
||||
.map(|stat| RegionStat {
|
||||
region_id: stat.region_id.as_u64(),
|
||||
engine: stat.engine,
|
||||
role: RegionRole::from(stat.role).into(),
|
||||
approximate_bytes,
|
||||
// TODO(ruihang): scratch more info
|
||||
// TODO(jeremy): w/rcus
|
||||
rcus: 0,
|
||||
wcus: 0,
|
||||
};
|
||||
region_stats.push(region_stat);
|
||||
}
|
||||
region_stats
|
||||
approximate_bytes: region_server.region_disk_usage(stat.region_id).unwrap_or(0),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub async fn close(&self) -> Result<()> {
|
||||
pub fn close(&self) -> Result<()> {
|
||||
let running = self.running.clone();
|
||||
if running
|
||||
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
|
||||
|
||||
@@ -290,7 +290,7 @@ impl RegionServer {
|
||||
self.inner.runtime.clone()
|
||||
}
|
||||
|
||||
pub async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
|
||||
pub fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
|
||||
match self.inner.region_map.get(®ion_id) {
|
||||
Some(e) => e.region_disk_usage(region_id),
|
||||
None => None,
|
||||
|
||||
@@ -26,6 +26,7 @@ common-recordbatch.workspace = true
|
||||
common-runtime.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-version.workspace = true
|
||||
datafusion.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
|
||||
@@ -24,6 +24,7 @@ use common_meta::heartbeat::handler::{
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
|
||||
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
|
||||
use common_telemetry::{debug, error, info};
|
||||
use greptime_proto::v1::meta::NodeInfo;
|
||||
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
|
||||
use servers::addrs;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
@@ -43,6 +44,7 @@ pub struct HeartbeatTask {
|
||||
report_interval: Duration,
|
||||
retry_interval: Duration,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
start_time_ms: u64,
|
||||
}
|
||||
|
||||
impl HeartbeatTask {
|
||||
@@ -59,6 +61,7 @@ impl HeartbeatTask {
|
||||
report_interval: heartbeat_opts.interval,
|
||||
retry_interval: heartbeat_opts.retry_interval,
|
||||
resp_handler_executor,
|
||||
start_time_ms: common_time::util::current_time_millis() as u64,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,23 +87,34 @@ impl HeartbeatTask {
|
||||
}
|
||||
|
||||
fn create_heartbeat_request(
|
||||
message: OutgoingMessage,
|
||||
self_peer: &Option<Peer>,
|
||||
message: Option<OutgoingMessage>,
|
||||
peer: Option<Peer>,
|
||||
start_time_ms: u64,
|
||||
) -> Option<HeartbeatRequest> {
|
||||
match outgoing_message_to_mailbox_message(message) {
|
||||
Ok(message) => {
|
||||
let req = HeartbeatRequest {
|
||||
mailbox_message: Some(message),
|
||||
peer: self_peer.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
Some(req)
|
||||
}
|
||||
Err(e) => {
|
||||
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
|
||||
Some(Ok(message)) => Some(message),
|
||||
Some(Err(e)) => {
|
||||
error!(e; "Failed to encode mailbox messages");
|
||||
None
|
||||
return None;
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
Some(HeartbeatRequest {
|
||||
mailbox_message,
|
||||
peer,
|
||||
info: Self::build_node_info(start_time_ms),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
|
||||
let build_info = common_version::build_info();
|
||||
Some(NodeInfo {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms,
|
||||
})
|
||||
}
|
||||
|
||||
fn start_heartbeat_report(
|
||||
@@ -109,6 +123,7 @@ impl HeartbeatTask {
|
||||
mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
|
||||
) {
|
||||
let report_interval = self.report_interval;
|
||||
let start_time_ms = self.start_time_ms;
|
||||
let self_peer = Some(Peer {
|
||||
id: self.node_id,
|
||||
addr: self.peer_addr.clone(),
|
||||
@@ -124,18 +139,14 @@ impl HeartbeatTask {
|
||||
let req = tokio::select! {
|
||||
message = outgoing_rx.recv() => {
|
||||
if let Some(message) = message {
|
||||
Self::create_heartbeat_request(message, &self_peer)
|
||||
Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms)
|
||||
} else {
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
}
|
||||
}
|
||||
_ = interval.tick() => {
|
||||
let req = HeartbeatRequest {
|
||||
peer: self_peer.clone(),
|
||||
..Default::default()
|
||||
};
|
||||
Some(req)
|
||||
Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -110,14 +110,36 @@ impl HeartbeatTask {
|
||||
});
|
||||
}
|
||||
|
||||
fn build_node_info(start_time_ms: u64) -> NodeInfo {
|
||||
fn create_heartbeat_request(
|
||||
message: Option<OutgoingMessage>,
|
||||
peer: Option<Peer>,
|
||||
start_time_ms: u64,
|
||||
) -> Option<HeartbeatRequest> {
|
||||
let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
|
||||
Some(Ok(message)) => Some(message),
|
||||
Some(Err(e)) => {
|
||||
error!(e; "Failed to encode mailbox messages");
|
||||
return None;
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
|
||||
Some(HeartbeatRequest {
|
||||
mailbox_message,
|
||||
peer,
|
||||
info: Self::build_node_info(start_time_ms),
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
|
||||
fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
|
||||
let build_info = common_version::build_info();
|
||||
|
||||
NodeInfo {
|
||||
Some(NodeInfo {
|
||||
version: build_info.version.to_string(),
|
||||
git_commit: build_info.commit_short.to_string(),
|
||||
start_time_ms,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn start_heartbeat_report(
|
||||
@@ -141,21 +163,7 @@ impl HeartbeatTask {
|
||||
let req = tokio::select! {
|
||||
message = outgoing_rx.recv() => {
|
||||
if let Some(message) = message {
|
||||
match outgoing_message_to_mailbox_message(message) {
|
||||
Ok(message) => {
|
||||
let req = HeartbeatRequest {
|
||||
mailbox_message: Some(message),
|
||||
peer: self_peer.clone(),
|
||||
info: Some(Self::build_node_info(start_time_ms)),
|
||||
..Default::default()
|
||||
};
|
||||
Some(req)
|
||||
}
|
||||
Err(e) => {
|
||||
error!(e; "Failed to encode mailbox messages");
|
||||
None
|
||||
}
|
||||
}
|
||||
Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms)
|
||||
} else {
|
||||
// Receives None that means Sender was dropped, we need to break the current loop
|
||||
break
|
||||
@@ -163,12 +171,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
_ = &mut sleep => {
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
|
||||
let req = HeartbeatRequest {
|
||||
peer: self_peer.clone(),
|
||||
info: Some(Self::build_node_info(start_time_ms)),
|
||||
..Default::default()
|
||||
};
|
||||
Some(req)
|
||||
Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user