refactor: use node info for active discovery (#8121)

* refactor: use node info for active discovery

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-05-20 15:31:01 +08:00
committed by GitHub
parent f8df016623
commit 2f1ca88f30
7 changed files with 60 additions and 176 deletions

View File

@@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, HeartbeatRequest};
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, FrontendWorkloads, HeartbeatRequest};
use common_error::ext::ErrorExt;
use lazy_static::lazy_static;
use regex::Regex;
@@ -186,8 +186,12 @@ pub struct DatanodeStatus {
}
/// The status of a frontend.
#[derive(Debug, Serialize, Deserialize)]
pub struct FrontendStatus {}
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct FrontendStatus {
/// The workloads of the frontend.
#[serde(default)]
pub workloads: FrontendWorkloads,
}
/// The status of a flownode.
#[derive(Debug, Serialize, Deserialize)]

View File

@@ -14,7 +14,7 @@
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, MailboxMessage};
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, FrontendWorkloads, MailboxMessage};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use common_time::util::current_time_millis;
@@ -90,6 +90,16 @@ pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> Datanod
}
}
/// Extracts frontend workloads from the provided optional `NodeWorkloads`.
///
/// Returns empty frontend workloads if the input is `None` or not a frontend payload.
pub fn get_frontend_workloads(node_workloads: Option<&NodeWorkloads>) -> FrontendWorkloads {
match node_workloads {
Some(NodeWorkloads::Frontend(frontend_workloads)) => frontend_workloads.clone(),
_ => FrontendWorkloads { types: vec![] },
}
}
/// Extracts flownode workloads from the provided optional `NodeWorkloads`.
///
/// Returns empty flownode workloads if the input is `None` or not a flownode payload.
@@ -124,4 +134,16 @@ mod tests {
let workloads = get_flownode_workloads(None);
assert!(workloads.types.is_empty());
}
#[test]
fn test_get_frontend_workloads() {
let node_workloads = Some(NodeWorkloads::Frontend(FrontendWorkloads {
types: vec![7],
}));
let workloads = get_frontend_workloads(node_workloads.as_ref());
assert_eq!(workloads.types, vec![7]);
let workloads = get_frontend_workloads(None);
assert!(workloads.types.is_empty());
}
}

View File

@@ -17,7 +17,8 @@ mod tests;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{FrontendWorkloads, HeartbeatRequest, NodeInfo, Peer};
use common_meta::datanode::EnvVars;
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
@@ -214,6 +215,7 @@ impl HeartbeatTask {
total_cpu_millicores,
total_memory_bytes,
),
node_workloads: Some(NodeWorkloads::Frontend(FrontendWorkloads { types: vec![] })),
extensions,
..Default::default()
};

View File

@@ -142,7 +142,7 @@ mod tests {
fn node_info(role: Role, id: u64, addr: &str, last_activity_ts: i64) -> NodeInfo {
let status = match role {
Role::Frontend => NodeStatus::Frontend(FrontendStatus {}),
Role::Frontend => NodeStatus::Frontend(FrontendStatus::default()),
Role::Datanode => NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,

View File

@@ -97,8 +97,7 @@ impl LeaseValueAccessor for MetaPeerClient {
mod tests {
use std::collections::HashMap;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads};
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, FrontendWorkloads};
use common_meta::cluster::{
DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus, Role,
};
@@ -110,24 +109,8 @@ mod tests {
use common_workload::DatanodeWorkloadType;
use crate::discovery::utils::accept_ingest_workload;
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::test_util::create_meta_peer_client;
async fn put_lease_value(
kv_backend: &ResettableKvBackendRef,
key: DatanodeLeaseKey,
value: LeaseValue,
) {
kv_backend
.put(PutRequest {
key: key.try_into().unwrap(),
value: value.try_into().unwrap(),
prev_kv: false,
})
.await
.unwrap();
}
async fn put_node_info(kv_backend: &ResettableKvBackendRef, key: NodeInfoKey, value: NodeInfo) {
kv_backend
.put(PutRequest {
@@ -139,72 +122,6 @@ mod tests {
.unwrap();
}
#[tokio::test]
async fn test_active_datanodes_uses_lease_liveness_with_stale_node_info() {
let client = create_meta_peer_client();
let in_memory = client.memory_backend();
let lease = default_distributed_time_constants().datanode_lease;
let mut env_vars = HashMap::new();
env_vars.insert("AZ".to_string(), "az-a".to_string());
put_node_info(
&in_memory,
NodeInfoKey {
role: Role::Datanode,
node_id: 1,
},
NodeInfo {
peer: Peer::new(1, "127.0.0.1:4001".to_string()),
last_activity_ts: current_time_millis() - (lease.as_millis() * 2) as i64,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads: DatanodeWorkloads {
types: vec![i32::MAX],
},
}),
version: String::new(),
git_commit: String::new(),
start_time_ms: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
env_vars,
},
)
.await;
put_lease_value(
&in_memory,
DatanodeLeaseKey { node_id: 1 },
LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:4001".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
},
)
.await;
let nodes = client
.active_datanodes(Some(accept_ingest_workload))
.await
.unwrap();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0].peer.id, 1);
assert_eq!(
nodes[0].env_vars.get("AZ").map(String::as_str),
Some("az-a")
);
}
#[tokio::test]
async fn test_active_datanodes_returns_node_info_with_env_vars() {
let client = create_meta_peer_client();
@@ -243,19 +160,6 @@ mod tests {
},
)
.await;
put_lease_value(
&in_memory,
DatanodeLeaseKey { node_id: 1 },
LeaseValue {
timestamp_millis: current_time_millis(),
node_addr: "127.0.0.1:4001".to_string(),
workloads: NodeWorkloads::Datanode(DatanodeWorkloads {
types: vec![DatanodeWorkloadType::Hybrid as i32],
}),
},
)
.await;
let nodes = client
.active_datanodes(Some(accept_ingest_workload))
.await
@@ -320,7 +224,9 @@ mod tests {
addr: "127.0.0.1:20201".to_string(),
},
last_activity_ts,
status: NodeStatus::Frontend(FrontendStatus {}),
status: NodeStatus::Frontend(FrontendStatus {
workloads: FrontendWorkloads { types: vec![] },
}),
version: "1.0.0".to_string(),
git_commit: "1234567890".to_string(),
start_time_ms: last_activity_ts as u64,

View File

@@ -12,12 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use common_meta::DatanodeId;
use common_meta::cluster::{DatanodeStatus, NodeInfo, NodeStatus};
use common_meta::cluster::{NodeInfo, NodeStatus};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::Peer;
use common_time::util::SystemTimer;
@@ -64,82 +63,24 @@ pub async fn alive_frontend_infos(
lister: &impl NodeInfoAccessor,
active_duration: Duration,
) -> Result<Vec<NodeInfo>> {
let active_filter = build_active_filter(active_duration);
let node_infos = lister.node_infos(NodeInfoType::Frontend).await?;
let now = timer.current_time_millis();
Ok(node_infos
.into_iter()
.filter_map(|(_, node_info)| active_filter(now, &node_info).then_some(node_info))
.collect::<Vec<_>>())
alive_node_infos(timer, lister, NodeInfoType::Frontend, active_duration, None).await
}
/// Returns the alive datanode node infos.
pub async fn alive_datanode_infos(
timer: &impl SystemTimer,
accessor: &(impl LeaseValueAccessor + NodeInfoAccessor),
lister: &impl NodeInfoAccessor,
active_duration: Duration,
condition: Option<fn(&NodeWorkloads) -> bool>,
) -> Result<Vec<NodeInfo>> {
let active_filter = build_active_filter(active_duration);
let condition = condition.unwrap_or(|_| true);
let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?;
let mut node_infos = accessor
.node_infos(NodeInfoType::Datanode)
.await?
.into_iter()
.collect::<HashMap<_, _>>();
let now = timer.current_time_millis();
Ok(lease_values
.into_iter()
.filter_map(|(peer_id, lease_value)| {
if !active_filter(now, &lease_value) || !condition(&lease_value.workloads) {
return None;
}
let peer = Peer::new(peer_id, lease_value.node_addr.clone());
let mut node_info = node_infos
.remove(&peer_id)
.filter(|node_info| matches!(node_info.status, NodeStatus::Datanode(_)))
.unwrap_or_else(|| datanode_node_info_from_lease(peer.clone(), &lease_value));
node_info.peer = peer;
node_info.last_activity_ts = lease_value.timestamp_millis;
if let (NodeStatus::Datanode(status), NodeWorkloads::Datanode(workloads)) =
(&mut node_info.status, &lease_value.workloads)
{
status.workloads = workloads.clone();
}
Some(node_info)
})
.collect::<Vec<_>>())
}
fn datanode_node_info_from_lease(peer: Peer, lease_value: &LeaseValue) -> NodeInfo {
let workloads = match &lease_value.workloads {
NodeWorkloads::Datanode(workloads) => workloads.clone(),
_ => Default::default(),
};
NodeInfo {
peer,
last_activity_ts: lease_value.timestamp_millis,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads,
}),
version: String::new(),
git_commit: String::new(),
start_time_ms: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
env_vars: Default::default(),
}
alive_node_infos(
timer,
lister,
NodeInfoType::Datanode,
active_duration,
condition,
)
.await
}
/// Returns the alive flownode node infos.
@@ -177,6 +118,11 @@ async fn alive_node_infos(
}
match (&node_info.status, condition) {
(NodeStatus::Frontend(_), None) => Some(node_info),
(NodeStatus::Frontend(status), Some(condition)) => {
let workloads = NodeWorkloads::Frontend(status.workloads.clone());
condition(&workloads).then_some(node_info)
}
(NodeStatus::Datanode(status), Some(condition)) => {
let workloads = NodeWorkloads::Datanode(status.workloads.clone());
condition(&workloads).then_some(node_info)

View File

@@ -19,7 +19,7 @@ use common_meta::cluster::{
DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
};
use common_meta::datanode::EnvVars;
use common_meta::heartbeat::utils::get_flownode_workloads;
use common_meta::heartbeat::utils::{get_flownode_workloads, get_frontend_workloads};
use common_meta::peer::Peer;
use common_meta::rpc::store::PutRequest;
use common_telemetry::warn;
@@ -50,10 +50,14 @@ impl HeartbeatHandler for CollectFrontendClusterInfoHandler {
return Ok(HandleControl::Continue);
};
let frontend_workloads = get_frontend_workloads(req.node_workloads.as_ref());
let value = NodeInfo {
peer,
last_activity_ts: common_time::util::current_time_millis(),
status: NodeStatus::Frontend(FrontendStatus {}),
status: NodeStatus::Frontend(FrontendStatus {
workloads: frontend_workloads,
}),
version: info.version,
git_commit: info.git_commit,
start_time_ms: info.start_time_ms,