From f6f617d6672d0d854b0a5dd052c8e40f4add48d5 Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Thu, 20 Feb 2025 11:55:18 +0800 Subject: [PATCH] feat: submit node's cpu cores number to metasrv in heartbeat (#5571) * feat: submit node's cpu cores number to metasrv in heartbeat * update greptime-proto dep --- Cargo.lock | 5 +- Cargo.toml | 2 +- src/cmd/src/metasrv.rs | 2 +- src/common/meta/src/cluster.rs | 58 +++++++++++++- src/datanode/Cargo.toml | 1 + src/datanode/src/heartbeat.rs | 28 ++++--- src/flow/Cargo.toml | 1 + src/flow/src/heartbeat.rs | 20 +++-- src/frontend/Cargo.toml | 1 + src/frontend/src/heartbeat.rs | 20 +++-- .../handler/collect_cluster_info_handler.rs | 76 ++----------------- 11 files changed, 112 insertions(+), 102 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 802f2567d9..a749e6a288 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3376,6 +3376,7 @@ dependencies = [ "meta-client", "metric-engine", "mito2", + "num_cpus", "object-store", "prometheus", "prost 0.13.3", @@ -4196,6 +4197,7 @@ dependencies = [ "meta-client", "nom", "num-traits", + "num_cpus", "operator", "partition", "pretty_assertions", @@ -4302,6 +4304,7 @@ dependencies = [ "log-query", "log-store", "meta-client", + "num_cpus", "opentelemetry-proto 0.27.0", "operator", "partition", @@ -4692,7 +4695,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=fc09a5696608d2a0aa718cc835d5cb9c4e8e9387#fc09a5696608d2a0aa718cc835d5cb9c4e8e9387" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a25adc8a01340231121646d8f0a29d0e92f45461#a25adc8a01340231121646d8f0a29d0e92f45461" dependencies = [ "prost 0.13.3", "serde", diff --git a/Cargo.toml b/Cargo.toml index c74e743ad9..a59de62bb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,7 +129,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "fc09a5696608d2a0aa718cc835d5cb9c4e8e9387" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a25adc8a01340231121646d8f0a29d0e92f45461" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 127defe031..063f36ffa4 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -42,7 +42,7 @@ pub struct Instance { } impl Instance { - fn new(instance: MetasrvInstance, guard: Vec) -> Self { + pub fn new(instance: MetasrvInstance, guard: Vec) -> Self { Self { instance, _guard: guard, diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 6e9d9c8ef3..bb2429c0e6 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::hash::{DefaultHasher, Hash, Hasher}; use std::str::FromStr; +use api::v1::meta::HeartbeatRequest; use common_error::ext::ErrorExt; use lazy_static::lazy_static; use regex::Regex; @@ -58,7 +60,7 @@ pub trait ClusterInfo { /// /// This key cannot be used to describe the `Metasrv` because the `Metasrv` does not have /// a `cluster_id`, it serves multiple clusters. -#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] pub struct NodeInfoKey { /// The cluster id. pub cluster_id: ClusterId, @@ -69,6 +71,28 @@ pub struct NodeInfoKey { } impl NodeInfoKey { + /// Try to create a `NodeInfoKey` from a "good" heartbeat request. "good" as in every needed + /// piece of information is provided and valid. + pub fn new(request: &HeartbeatRequest) -> Option { + let HeartbeatRequest { header, peer, .. } = request; + let header = header.as_ref()?; + let peer = peer.as_ref()?; + + let role = header.role.try_into().ok()?; + let node_id = match role { + // Because the Frontend is stateless, it's too easy to neglect choosing a unique id + // for it when setting up a cluster. So we calculate its id from its address. + Role::Frontend => calculate_node_id(&peer.addr), + _ => peer.id, + }; + + Some(NodeInfoKey { + cluster_id: header.cluster_id, + role, + node_id, + }) + } + pub fn key_prefix_with_cluster_id(cluster_id: u64) -> String { format!("{}-{}-", CLUSTER_NODE_INFO_PREFIX, cluster_id) } @@ -83,6 +107,13 @@ impl NodeInfoKey { } } +/// Calculate (by using the DefaultHasher) the node's id from its address. +fn calculate_node_id(addr: &str) -> u64 { + let mut hasher = DefaultHasher::new(); + addr.hash(&mut hasher); + hasher.finish() +} + /// The information of a node in the cluster. #[derive(Debug, Serialize, Deserialize)] pub struct NodeInfo { @@ -100,7 +131,7 @@ pub struct NodeInfo { pub start_time_ms: u64, } -#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)] pub enum Role { Datanode, Frontend, @@ -271,6 +302,7 @@ impl TryFrom for Role { mod tests { use std::assert_matches::assert_matches; + use super::*; use crate::cluster::Role::{Datanode, Frontend}; use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus}; use crate::peer::Peer; @@ -338,4 +370,26 @@ mod tests { let prefix = NodeInfoKey::key_prefix_with_role(2, Frontend); assert_eq!(prefix, "__meta_cluster_node_info-2-1-"); } + + #[test] + fn test_calculate_node_id_from_addr() { + // Test empty string + assert_eq!(calculate_node_id(""), calculate_node_id("")); + + // Test same addresses return same ids + let addr1 = "127.0.0.1:8080"; + let id1 = calculate_node_id(addr1); + let id2 = calculate_node_id(addr1); + assert_eq!(id1, id2); + + // Test different addresses return different ids + let addr2 = "127.0.0.1:8081"; + let id3 = calculate_node_id(addr2); + assert_ne!(id1, id3); + + // Test long address + let long_addr = "very.long.domain.name.example.com:9999"; + let id4 = calculate_node_id(long_addr); + assert!(id4 > 0); + } } diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index fc6edbb651..3d0bfdda9b 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -47,6 +47,7 @@ log-store.workspace = true meta-client.workspace = true metric-engine.workspace = true mito2.workspace = true +num_cpus.workspace = true object-store.workspace = true prometheus.workspace = true prost.workspace = true diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index 1db411ddff..04d38a3524 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -224,6 +224,20 @@ impl HeartbeatTask { common_runtime::spawn_hb(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); + + let build_info = common_version::build_info(); + let heartbeat_request = HeartbeatRequest { + peer: self_peer, + node_epoch, + info: Some(NodeInfo { + version: build_info.version.to_string(), + git_commit: build_info.commit_short.to_string(), + start_time_ms: node_epoch, + cpus: num_cpus::get() as u32, + }), + ..Default::default() + }; + loop { if !running.load(Ordering::Relaxed) { info!("shutdown heartbeat task"); @@ -235,9 +249,8 @@ impl HeartbeatTask { match outgoing_message_to_mailbox_message(message) { Ok(message) => { let req = HeartbeatRequest { - peer: self_peer.clone(), mailbox_message: Some(message), - ..Default::default() + ..heartbeat_request.clone() }; HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc(); Some(req) @@ -253,22 +266,13 @@ impl HeartbeatTask { } } _ = &mut sleep => { - let build_info = common_version::build_info(); let region_stats = Self::load_region_stats(®ion_server_clone); let now = Instant::now(); let duration_since_epoch = (now - epoch).as_millis() as u64; let req = HeartbeatRequest { - peer: self_peer.clone(), region_stats, duration_since_epoch, - node_epoch, - info: Some(NodeInfo { - version: build_info.version.to_string(), - git_commit: build_info.commit_short.to_string(), - // The start timestamp is the same as node_epoch currently. - start_time_ms: node_epoch, - }), - ..Default::default() + ..heartbeat_request.clone() }; sleep.as_mut().reset(now + Duration::from_millis(interval)); Some(req) diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index df62097e2d..461e4382e5 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -53,6 +53,7 @@ lazy_static.workspace = true meta-client.workspace = true nom = "7.1.3" num-traits = "0.2" +num_cpus.workspace = true operator.workspace = true partition.workspace = true prometheus.workspace = true diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 4368b5d8fd..54164fef3d 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -134,10 +134,9 @@ impl HeartbeatTask { } } - fn create_heartbeat_request( + fn new_heartbeat_request( + heartbeat_request: &HeartbeatRequest, message: Option, - peer: Option, - start_time_ms: u64, latest_report: &Option, ) -> Option { let mailbox_message = match message.map(outgoing_message_to_mailbox_message) { @@ -161,10 +160,8 @@ impl HeartbeatTask { Some(HeartbeatRequest { mailbox_message, - peer, - info: Self::build_node_info(start_time_ms), flow_stat, - ..Default::default() + ..heartbeat_request.clone() }) } @@ -174,6 +171,7 @@ impl HeartbeatTask { version: build_info.version.to_string(), git_commit: build_info.commit_short.to_string(), start_time_ms, + cpus: num_cpus::get() as u32, }) } @@ -198,18 +196,24 @@ impl HeartbeatTask { interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let mut latest_report = None; + let heartbeat_request = HeartbeatRequest { + peer: self_peer, + info: Self::build_node_info(start_time_ms), + ..Default::default() + }; + loop { let req = tokio::select! { message = outgoing_rx.recv() => { if let Some(message) = message { - Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms, &latest_report) + Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report) } else { // Receives None that means Sender was dropped, we need to break the current loop break } } _ = interval.tick() => { - Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms, &latest_report) + Self::new_heartbeat_request(&heartbeat_request, None, &latest_report) } }; diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 6d01e6ac12..2cefc12b2d 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -44,6 +44,7 @@ lazy_static.workspace = true log-query.workspace = true log-store.workspace = true meta-client.workspace = true +num_cpus.workspace = true opentelemetry-proto.workspace = true operator.workspace = true partition.workspace = true diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 7dc623daaa..47dac786b2 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -118,10 +118,9 @@ impl HeartbeatTask { }); } - fn create_heartbeat_request( + fn new_heartbeat_request( + heartbeat_request: &HeartbeatRequest, message: Option, - peer: Option, - start_time_ms: u64, ) -> Option { let mailbox_message = match message.map(outgoing_message_to_mailbox_message) { Some(Ok(message)) => Some(message), @@ -134,9 +133,7 @@ impl HeartbeatTask { Some(HeartbeatRequest { mailbox_message, - peer, - info: Self::build_node_info(start_time_ms), - ..Default::default() + ..heartbeat_request.clone() }) } @@ -147,6 +144,7 @@ impl HeartbeatTask { version: build_info.version.to_string(), git_commit: build_info.commit_short.to_string(), start_time_ms, + cpus: num_cpus::get() as u32, }) } @@ -167,11 +165,17 @@ impl HeartbeatTask { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); + let heartbeat_request = HeartbeatRequest { + peer: self_peer, + info: Self::build_node_info(start_time_ms), + ..Default::default() + }; + loop { let req = tokio::select! { message = outgoing_rx.recv() => { if let Some(message) = message { - Self::create_heartbeat_request(Some(message), self_peer.clone(), start_time_ms) + Self::new_heartbeat_request(&heartbeat_request, Some(message)) } else { // Receives None that means Sender was dropped, we need to break the current loop break @@ -179,7 +183,7 @@ impl HeartbeatTask { } _ = &mut sleep => { sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval)); - Self::create_heartbeat_request(None, self_peer.clone(), start_time_ms) + Self::new_heartbeat_request(&heartbeat_request, None) } }; diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index 016757ecfd..0723ae9cad 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -12,11 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::DefaultHasher; -use std::hash::{Hash, Hasher}; - use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role}; -use common_meta::cluster; use common_meta::cluster::{ DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, }; @@ -45,7 +41,7 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result { - let Some((key, peer, info)) = extract_base_info(req, Role::Frontend) else { + let Some((key, peer, info)) = extract_base_info(req) else { return Ok(HandleControl::Continue); }; @@ -78,7 +74,7 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler { ctx: &mut Context, _acc: &mut HeartbeatAccumulator, ) -> Result { - let Some((key, peer, info)) = extract_base_info(req, Role::Flownode) else { + let Some((key, peer, info)) = extract_base_info(req) else { return Ok(HandleControl::Continue); }; @@ -112,7 +108,7 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { ctx: &mut Context, acc: &mut HeartbeatAccumulator, ) -> Result { - let Some((key, peer, info)) = extract_base_info(req, Role::Datanode) else { + let Some((key, peer, info)) = extract_base_info(req) else { return Ok(HandleControl::Continue); }; @@ -147,16 +143,9 @@ impl HeartbeatHandler for CollectDatanodeClusterInfoHandler { } } -fn extract_base_info( - req: &HeartbeatRequest, - role: Role, -) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> { - let HeartbeatRequest { - header, peer, info, .. - } = req; - let Some(header) = &header else { - return None; - }; +fn extract_base_info(request: &HeartbeatRequest) -> Option<(NodeInfoKey, Peer, PbNodeInfo)> { + let HeartbeatRequest { peer, info, .. } = request; + let key = NodeInfoKey::new(request)?; let Some(peer) = &peer else { return None; }; @@ -164,24 +153,7 @@ fn extract_base_info( return None; }; - Some(( - NodeInfoKey { - cluster_id: header.cluster_id, - role: match role { - Role::Datanode => cluster::Role::Datanode, - Role::Frontend => cluster::Role::Frontend, - Role::Flownode => cluster::Role::Flownode, - }, - node_id: match role { - Role::Datanode => peer.id, - Role::Flownode => peer.id, - // The ID is solely for ensuring the key's uniqueness and serves no other purpose. - Role::Frontend => allocate_id_by_peer_addr(peer.addr.as_str()), - }, - }, - Peer::from(peer.clone()), - info.clone(), - )) + Some((key, Peer::from(peer.clone()), info.clone())) } async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeInfo) -> Result<()> { @@ -200,37 +172,3 @@ async fn put_into_memory_store(ctx: &mut Context, key: NodeInfoKey, value: NodeI Ok(()) } - -// Allocate id based on peer address using a hash function -fn allocate_id_by_peer_addr(peer_addr: &str) -> u64 { - let mut hasher = DefaultHasher::new(); - peer_addr.hash(&mut hasher); - hasher.finish() -} - -#[cfg(test)] -mod allocate_id_tests { - use super::*; - - #[test] - fn test_allocate_id_by_peer_addr() { - // Test empty string - assert_eq!(allocate_id_by_peer_addr(""), allocate_id_by_peer_addr("")); - - // Test same address returns same id - let addr1 = "127.0.0.1:8080"; - let id1 = allocate_id_by_peer_addr(addr1); - let id2 = allocate_id_by_peer_addr(addr1); - assert_eq!(id1, id2); - - // Test different addresses return different ids - let addr2 = "127.0.0.1:8081"; - let id3 = allocate_id_by_peer_addr(addr2); - assert_ne!(id1, id3); - - // Test long address - let long_addr = "very.long.domain.name.example.com:9999"; - let id4 = allocate_id_by_peer_addr(long_addr); - assert!(id4 > 0); - } -}