From 214fd38f693ce1edbebdcc1c3ce4bfcdfe645612 Mon Sep 17 00:00:00 2001 From: Jeremyhi Date: Mon, 1 Jul 2024 11:19:25 +0800 Subject: [PATCH] feat: add build info for flow heartbeat task (#4228) * chore: refactor load region stats * feat: add build info for flow heartbeat --- Cargo.lock | 1 + src/datanode/src/datanode.rs | 1 - src/datanode/src/heartbeat.rs | 31 ++++++++----------- src/datanode/src/region_server.rs | 2 +- src/flow/Cargo.toml | 1 + src/flow/src/heartbeat.rs | 51 +++++++++++++++++++------------ src/frontend/src/heartbeat.rs | 51 ++++++++++++++++--------------- 7 files changed, 73 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 68663ae65c..9d5a9a6ab2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3966,6 +3966,7 @@ dependencies = [ "common-runtime", "common-telemetry", "common-time", + "common-version", "datafusion 38.0.0", "datafusion-common 38.0.0", "datafusion-expr 38.0.0", diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 04dc6c196a..fa07c00adf 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -136,7 +136,6 @@ impl Datanode { if let Some(heartbeat_task) = &self.heartbeat_task { heartbeat_task .close() - .await .map_err(BoxedError::new) .context(ShutdownInstanceSnafu)?; } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index ff872b7959..20eac6f8f5 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -158,7 +158,7 @@ impl HeartbeatTask { ctx: HeartbeatResponseHandlerContext, handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Result<()> { - trace!("heartbeat response: {:?}", ctx.response); + trace!("Heartbeat response: {:?}", ctx.response); handler_executor .handle(ctx) .await @@ -245,7 +245,7 @@ impl HeartbeatTask { } _ = &mut sleep => { let build_info = common_version::build_info(); - let region_stats = Self::load_region_stats(®ion_server_clone).await; + let region_stats = Self::load_region_stats(®ion_server_clone); let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; let req = HeartbeatRequest { @@ -313,30 +313,23 @@ impl HeartbeatTask { Ok(()) } - async fn load_region_stats(region_server: &RegionServer) -> Vec { - let regions = region_server.reportable_regions(); - - let mut region_stats = Vec::new(); - for stat in regions { - let approximate_bytes = region_server - .region_disk_usage(stat.region_id) - .await - .unwrap_or(0); - let region_stat = RegionStat { + fn load_region_stats(region_server: &RegionServer) -> Vec { + region_server + .reportable_regions() + .into_iter() + .map(|stat| RegionStat { region_id: stat.region_id.as_u64(), engine: stat.engine, role: RegionRole::from(stat.role).into(), - approximate_bytes, - // TODO(ruihang): scratch more info + // TODO(jeremy): w/rcus rcus: 0, wcus: 0, - }; - region_stats.push(region_stat); - } - region_stats + approximate_bytes: region_server.region_disk_usage(stat.region_id).unwrap_or(0), + }) + .collect() } - pub async fn close(&self) -> Result<()> { + pub fn close(&self) -> Result<()> { let running = self.running.clone(); if running .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 8fa2eae383..83225334ea 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -290,7 +290,7 @@ impl RegionServer { self.inner.runtime.clone() } - pub async fn region_disk_usage(&self, region_id: RegionId) -> Option { + pub fn region_disk_usage(&self, region_id: RegionId) -> Option { match self.inner.region_map.get(®ion_id) { Some(e) => e.region_disk_usage(region_id), None => None, diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index fcf33e45fe..ebf2760695 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -26,6 +26,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true +common-version.workspace = true datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index e46769aeb6..ed3fe66a86 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -24,6 +24,7 @@ use common_meta::heartbeat::handler::{ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info}; +use greptime_proto::v1::meta::NodeInfo; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; use servers::heartbeat_options::HeartbeatOptions; @@ -43,6 +44,7 @@ pub struct HeartbeatTask { report_interval: Duration, retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + start_time_ms: u64, } impl HeartbeatTask { @@ -59,6 +61,7 @@ impl HeartbeatTask { report_interval: heartbeat_opts.interval, retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, + start_time_ms: common_time::util::current_time_millis() as u64, } } @@ -84,23 +87,34 @@ impl HeartbeatTask { } fn create_heartbeat_request( - message: OutgoingMessage, - self_peer: &Option, + message: Option, + peer: Option, + start_time_ms: u64, ) -> Option { - match outgoing_message_to_mailbox_message(message) { - Ok(message) => { - let req = HeartbeatRequest { - mailbox_message: Some(message), - peer: self_peer.clone(), - ..Default::default() - }; - Some(req) - } - Err(e) => { + let mailbox_message = match message.map(outgoing_message_to_mailbox_message) { + Some(Ok(message)) => Some(message), + Some(Err(e)) => { error!(e; "Failed to encode mailbox messages"); - None + return None; } - } + None => None, + }; + + Some(HeartbeatRequest { + mailbox_message, + peer, + info: Self::build_node_info(start_time_ms), + ..Default::default() + }) + } + + fn build_node_info(start_time_ms: u64) -> Option { + let build_info = common_version::build_info(); + Some(NodeInfo { + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + start_time_ms, + }) } fn start_heartbeat_report( @@ -109,6 +123,7 @@ impl HeartbeatTask { mut outgoing_rx: mpsc::Receiver, ) { let report_interval = self.report_interval; + let start_time_ms = self.start_time_ms; let self_peer = Some(Peer { id: self.node_id, addr: self.peer_addr.clone(), @@ -124,18 +139,14 @@ impl HeartbeatTask { let req = tokio::select! { message = outgoing_rx.recv() => { if let Some(message) = message { - Self::create_heartbeat_request(message, &self_peer) + Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms) } else { // Receives None that means Sender was dropped, we need to break the current loop break } } _ = interval.tick() => { - let req = HeartbeatRequest { - peer: self_peer.clone(), - ..Default::default() - }; - Some(req) + Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms) } }; diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 48c09bacd5..1bea71e87e 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -110,14 +110,36 @@ impl HeartbeatTask { }); } - fn build_node_info(start_time_ms: u64) -> NodeInfo { + fn create_heartbeat_request( + message: Option, + peer: Option, + start_time_ms: u64, + ) -> Option { + let mailbox_message = match message.map(outgoing_message_to_mailbox_message) { + Some(Ok(message)) => Some(message), + Some(Err(e)) => { + error!(e; "Failed to encode mailbox messages"); + return None; + } + None => None, + }; + + Some(HeartbeatRequest { + mailbox_message, + peer, + info: Self::build_node_info(start_time_ms), + ..Default::default() + }) + } + + fn build_node_info(start_time_ms: u64) -> Option { let build_info = common_version::build_info(); - NodeInfo { + Some(NodeInfo { version: build_info.version.to_string(), git_commit: build_info.commit_short.to_string(), start_time_ms, - } + }) } fn start_heartbeat_report( @@ -141,21 +163,7 @@ impl HeartbeatTask { let req = tokio::select! { message = outgoing_rx.recv() => { if let Some(message) = message { - match outgoing_message_to_mailbox_message(message) { - Ok(message) => { - let req = HeartbeatRequest { - mailbox_message: Some(message), - peer: self_peer.clone(), - info: Some(Self::build_node_info(start_time_ms)), - ..Default::default() - }; - Some(req) - } - Err(e) => { - error!(e; "Failed to encode mailbox messages"); - None - } - } + Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms) } else { // Receives None that means Sender was dropped, we need to break the current loop break @@ -163,12 +171,7 @@ impl HeartbeatTask { } _ = &mut sleep => { sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); - let req = HeartbeatRequest { - peer: self_peer.clone(), - info: Some(Self::build_node_info(start_time_ms)), - ..Default::default() - }; - Some(req) + Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms) } };