diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs index 5ab2b87399..149e5a059d 100644 --- a/src/common/meta/src/cluster.rs +++ b/src/common/meta/src/cluster.rs @@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter}; use std::hash::{DefaultHasher, Hash, Hasher}; use std::str::FromStr; -use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, HeartbeatRequest}; +use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, FrontendWorkloads, HeartbeatRequest}; use common_error::ext::ErrorExt; use lazy_static::lazy_static; use regex::Regex; @@ -186,8 +186,12 @@ pub struct DatanodeStatus { } /// The status of a frontend. -#[derive(Debug, Serialize, Deserialize)] -pub struct FrontendStatus {} +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct FrontendStatus { + /// The workloads of the frontend. + #[serde(default)] + pub workloads: FrontendWorkloads, +} /// The status of a flownode. #[derive(Debug, Serialize, Deserialize)] diff --git a/src/common/meta/src/heartbeat/utils.rs b/src/common/meta/src/heartbeat/utils.rs index f1c3eec1af..316bafbc30 100644 --- a/src/common/meta/src/heartbeat/utils.rs +++ b/src/common/meta/src/heartbeat/utils.rs @@ -14,7 +14,7 @@ use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::mailbox_message::Payload; -use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, MailboxMessage}; +use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, FrontendWorkloads, MailboxMessage}; use common_telemetry::tracing_context::TracingContext; use common_telemetry::warn; use common_time::util::current_time_millis; @@ -90,6 +90,16 @@ pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> Datanod } } +/// Extracts frontend workloads from the provided optional `NodeWorkloads`. +/// +/// Returns empty frontend workloads if the input is `None` or not a frontend payload. +pub fn get_frontend_workloads(node_workloads: Option<&NodeWorkloads>) -> FrontendWorkloads { + match node_workloads { + Some(NodeWorkloads::Frontend(frontend_workloads)) => frontend_workloads.clone(), + _ => FrontendWorkloads { types: vec![] }, + } +} + /// Extracts flownode workloads from the provided optional `NodeWorkloads`. /// /// Returns empty flownode workloads if the input is `None` or not a flownode payload. @@ -124,4 +134,16 @@ mod tests { let workloads = get_flownode_workloads(None); assert!(workloads.types.is_empty()); } + + #[test] + fn test_get_frontend_workloads() { + let node_workloads = Some(NodeWorkloads::Frontend(FrontendWorkloads { + types: vec![7], + })); + let workloads = get_frontend_workloads(node_workloads.as_ref()); + assert_eq!(workloads.types, vec![7]); + + let workloads = get_frontend_workloads(None); + assert!(workloads.types.is_empty()); + } } diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 5ce695b7ce..4dca46d9fb 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -17,7 +17,8 @@ mod tests; use std::sync::Arc; -use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer}; +use api::v1::meta::heartbeat_request::NodeWorkloads; +use api::v1::meta::{FrontendWorkloads, HeartbeatRequest, NodeInfo, Peer}; use common_meta::datanode::EnvVars; use common_meta::heartbeat::handler::{ HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, @@ -214,6 +215,7 @@ impl HeartbeatTask { total_cpu_millicores, total_memory_bytes, ), + node_workloads: Some(NodeWorkloads::Frontend(FrontendWorkloads { types: vec![] })), extensions, ..Default::default() }; diff --git a/src/meta-client/src/client/util.rs b/src/meta-client/src/client/util.rs index 61a80a83e3..62566f4378 100644 --- a/src/meta-client/src/client/util.rs +++ b/src/meta-client/src/client/util.rs @@ -142,7 +142,7 @@ mod tests { fn node_info(role: Role, id: u64, addr: &str, last_activity_ts: i64) -> NodeInfo { let status = match role { - Role::Frontend => NodeStatus::Frontend(FrontendStatus {}), + Role::Frontend => NodeStatus::Frontend(FrontendStatus::default()), Role::Datanode => NodeStatus::Datanode(DatanodeStatus { rcus: 0, wcus: 0, diff --git a/src/meta-srv/src/discovery/lease.rs b/src/meta-srv/src/discovery/lease.rs index f3c603af00..585676d41b 100644 --- a/src/meta-srv/src/discovery/lease.rs +++ b/src/meta-srv/src/discovery/lease.rs @@ -97,8 +97,7 @@ impl LeaseValueAccessor for MetaPeerClient { mod tests { use std::collections::HashMap; - use api::v1::meta::heartbeat_request::NodeWorkloads; - use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads}; + use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, FrontendWorkloads}; use common_meta::cluster::{ DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role, }; @@ -110,24 +109,8 @@ mod tests { use common_workload::DatanodeWorkloadType; use crate::discovery::utils::accept_ingest_workload; - use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::test_util::create_meta_peer_client; - async fn put_lease_value( - kv_backend: &ResettableKvBackendRef, - key: DatanodeLeaseKey, - value: LeaseValue, - ) { - kv_backend - .put(PutRequest { - key: key.try_into().unwrap(), - value: value.try_into().unwrap(), - prev_kv: false, - }) - .await - .unwrap(); - } - async fn put_node_info(kv_backend: &ResettableKvBackendRef, key: NodeInfoKey, value: NodeInfo) { kv_backend .put(PutRequest { @@ -139,72 +122,6 @@ mod tests { .unwrap(); } - #[tokio::test] - async fn test_active_datanodes_uses_lease_liveness_with_stale_node_info() { - let client = create_meta_peer_client(); - let in_memory = client.memory_backend(); - let lease = default_distributed_time_constants().datanode_lease; - - let mut env_vars = HashMap::new(); - env_vars.insert("AZ".to_string(), "az-a".to_string()); - - put_node_info( - &in_memory, - NodeInfoKey { - role: Role::Datanode, - node_id: 1, - }, - NodeInfo { - peer: Peer::new(1, "127.0.0.1:4001".to_string()), - last_activity_ts: current_time_millis() - (lease.as_millis() * 2) as i64, - status: NodeStatus::Datanode(DatanodeStatus { - rcus: 0, - wcus: 0, - leader_regions: 0, - follower_regions: 0, - workloads: DatanodeWorkloads { - types: vec![i32::MAX], - }, - }), - version: String::new(), - git_commit: String::new(), - start_time_ms: 0, - total_cpu_millicores: 0, - total_memory_bytes: 0, - cpu_usage_millicores: 0, - memory_usage_bytes: 0, - hostname: String::new(), - env_vars, - }, - ) - .await; - - put_lease_value( - &in_memory, - DatanodeLeaseKey { node_id: 1 }, - LeaseValue { - timestamp_millis: current_time_millis(), - node_addr: "127.0.0.1:4001".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }, - ) - .await; - - let nodes = client - .active_datanodes(Some(accept_ingest_workload)) - .await - .unwrap(); - - assert_eq!(nodes.len(), 1); - assert_eq!(nodes[0].peer.id, 1); - assert_eq!( - nodes[0].env_vars.get("AZ").map(String::as_str), - Some("az-a") - ); - } - #[tokio::test] async fn test_active_datanodes_returns_node_info_with_env_vars() { let client = create_meta_peer_client(); @@ -243,19 +160,6 @@ mod tests { }, ) .await; - put_lease_value( - &in_memory, - DatanodeLeaseKey { node_id: 1 }, - LeaseValue { - timestamp_millis: current_time_millis(), - node_addr: "127.0.0.1:4001".to_string(), - workloads: NodeWorkloads::Datanode(DatanodeWorkloads { - types: vec![DatanodeWorkloadType::Hybrid as i32], - }), - }, - ) - .await; - let nodes = client .active_datanodes(Some(accept_ingest_workload)) .await @@ -320,7 +224,9 @@ mod tests { addr: "127.0.0.1:20201".to_string(), }, last_activity_ts, - status: NodeStatus::Frontend(FrontendStatus {}), + status: NodeStatus::Frontend(FrontendStatus { + workloads: FrontendWorkloads { types: vec![] }, + }), version: "1.0.0".to_string(), git_commit: "1234567890".to_string(), start_time_ms: last_activity_ts as u64, diff --git a/src/meta-srv/src/discovery/utils.rs b/src/meta-srv/src/discovery/utils.rs index 2db2d73881..0dda487f88 100644 --- a/src/meta-srv/src/discovery/utils.rs +++ b/src/meta-srv/src/discovery/utils.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; use std::time::Duration; use api::v1::meta::heartbeat_request::NodeWorkloads; use common_meta::DatanodeId; -use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus}; +use common_meta::cluster::{NodeInfo, NodeStatus}; use common_meta::kv_backend::KvBackendRef; use common_meta::peer::Peer; use common_time::util::SystemTimer; @@ -64,82 +63,24 @@ pub async fn alive_frontend_infos( lister: &impl NodeInfoAccessor, active_duration: Duration, ) -> Result> { - let active_filter = build_active_filter(active_duration); - let node_infos = lister.node_infos(NodeInfoType::Frontend).await?; - let now = timer.current_time_millis(); - Ok(node_infos - .into_iter() - .filter_map(|(_, node_info)| active_filter(now, &node_info).then_some(node_info)) - .collect::>()) + alive_node_infos(timer, lister, NodeInfoType::Frontend, active_duration, None).await } /// Returns the alive datanode node infos. pub async fn alive_datanode_infos( timer: &impl SystemTimer, - accessor: &(impl LeaseValueAccessor + NodeInfoAccessor), + lister: &impl NodeInfoAccessor, active_duration: Duration, condition: Option bool>, ) -> Result> { - let active_filter = build_active_filter(active_duration); - let condition = condition.unwrap_or(|_| true); - let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?; - let mut node_infos = accessor - .node_infos(NodeInfoType::Datanode) - .await? - .into_iter() - .collect::>(); - let now = timer.current_time_millis(); - - Ok(lease_values - .into_iter() - .filter_map(|(peer_id, lease_value)| { - if !active_filter(now, &lease_value) || !condition(&lease_value.workloads) { - return None; - } - - let peer = Peer::new(peer_id, lease_value.node_addr.clone()); - let mut node_info = node_infos - .remove(&peer_id) - .filter(|node_info| matches!(node_info.status, NodeStatus::Datanode(_))) - .unwrap_or_else(|| datanode_node_info_from_lease(peer.clone(), &lease_value)); - node_info.peer = peer; - node_info.last_activity_ts = lease_value.timestamp_millis; - if let (NodeStatus::Datanode(status), NodeWorkloads::Datanode(workloads)) = - (&mut node_info.status, &lease_value.workloads) - { - status.workloads = workloads.clone(); - } - Some(node_info) - }) - .collect::>()) -} - -fn datanode_node_info_from_lease(peer: Peer, lease_value: &LeaseValue) -> NodeInfo { - let workloads = match &lease_value.workloads { - NodeWorkloads::Datanode(workloads) => workloads.clone(), - _ => Default::default(), - }; - - NodeInfo { - peer, - last_activity_ts: lease_value.timestamp_millis, - status: NodeStatus::Datanode(DatanodeStatus { - rcus: 0, - wcus: 0, - leader_regions: 0, - follower_regions: 0, - workloads, - }), - version: String::new(), - git_commit: String::new(), - start_time_ms: 0, - total_cpu_millicores: 0, - total_memory_bytes: 0, - cpu_usage_millicores: 0, - memory_usage_bytes: 0, - hostname: String::new(), - env_vars: Default::default(), - } + alive_node_infos( + timer, + lister, + NodeInfoType::Datanode, + active_duration, + condition, + ) + .await } /// Returns the alive flownode node infos. @@ -177,6 +118,11 @@ async fn alive_node_infos( } match (&node_info.status, condition) { + (NodeStatus::Frontend(_), None) => Some(node_info), + (NodeStatus::Frontend(status), Some(condition)) => { + let workloads = NodeWorkloads::Frontend(status.workloads.clone()); + condition(&workloads).then_some(node_info) + } (NodeStatus::Datanode(status), Some(condition)) => { let workloads = NodeWorkloads::Datanode(status.workloads.clone()); condition(&workloads).then_some(node_info) diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs index c4d73477b6..6bee86ad71 100644 --- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs +++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs @@ -19,7 +19,7 @@ use common_meta::cluster::{ DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, }; use common_meta::datanode::EnvVars; -use common_meta::heartbeat::utils::get_flownode_workloads; +use common_meta::heartbeat::utils::{get_flownode_workloads, get_frontend_workloads}; use common_meta::peer::Peer; use common_meta::rpc::store::PutRequest; use common_telemetry::warn; @@ -50,10 +50,14 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler { return Ok(HandleControl::Continue); }; + let frontend_workloads = get_frontend_workloads(req.node_workloads.as_ref()); + let value = NodeInfo { peer, last_activity_ts: common_time::util::current_time_millis(), - status: NodeStatus::Frontend(FrontendStatus {}), + status: NodeStatus::Frontend(FrontendStatus { + workloads: frontend_workloads, + }), version: info.version, git_commit: info.git_commit, start_time_ms: info.start_time_ms,