diff --git a/src/common/meta/src/peer.rs b/src/common/meta/src/peer.rs index f8b0f275b9..834573b98b 100644 --- a/src/common/meta/src/peer.rs +++ b/src/common/meta/src/peer.rs @@ -19,11 +19,17 @@ pub use api::v1::meta::Peer; use crate::error::Error; use crate::{DatanodeId, FlownodeId}; -/// can query peer given a node id +/// PeerLookupService is a service that can lookup peers. #[async_trait::async_trait] pub trait PeerLookupService { + /// Returns the datanode with the given id. It may return inactive peers. async fn datanode(&self, id: DatanodeId) -> Result, Error>; + + /// Returns the flownode with the given id. It may return inactive peers. async fn flownode(&self, id: FlownodeId) -> Result, Error>; + + /// Returns all currently active frontend nodes that have reported a heartbeat within the most recent heartbeat interval from the in-memory backend. + async fn active_frontends(&self) -> Result, Error>; } pub type PeerLookupServiceRef = Arc; diff --git a/src/common/meta/src/test_util.rs b/src/common/meta/src/test_util.rs index 8dc4a38643..491a4e70c6 100644 --- a/src/common/meta/src/test_util.rs +++ b/src/common/meta/src/test_util.rs @@ -213,6 +213,10 @@ impl PeerLookupService for NoopPeerLookupService { async fn flownode(&self, id: FlownodeId) -> Result> { Ok(Some(Peer::empty(id))) } + + async fn active_frontends(&self) -> Result> { + Ok(vec![]) + } } /// Create a kafka topic pool for testing. diff --git a/src/meta-srv/src/lease.rs b/src/meta-srv/src/lease.rs index 26a194c035..ec6244450c 100644 --- a/src/meta-srv/src/lease.rs +++ b/src/meta-srv/src/lease.rs @@ -20,8 +20,11 @@ use std::task::{Context, Poll}; use api::v1::meta::heartbeat_request::NodeWorkloads; use common_error::ext::BoxedError; +use common_meta::cluster::{NodeInfo, NodeInfoKey, Role as ClusterRole}; +use common_meta::distributed_time_constants::FRONTEND_HEARTBEAT_INTERVAL_MILLIS; use common_meta::kv_backend::{KvBackend, ResettableKvBackendRef}; use common_meta::peer::{Peer, PeerLookupService}; +use common_meta::rpc::store::RangeRequest; use common_meta::{util, DatanodeId, FlownodeId}; use common_time::util as time_util; use common_workload::DatanodeWorkloadType; @@ -31,10 +34,19 @@ use crate::cluster::MetaPeerClientRef; use crate::error::{Error, KvBackendSnafu, Result}; use crate::key::{DatanodeLeaseKey, FlownodeLeaseKey, LeaseValue}; -fn build_lease_filter(lease_secs: u64) -> impl Fn(&LeaseValue) -> bool { - move |v: &LeaseValue| { - ((time_util::current_time_millis() - v.timestamp_millis) as u64) - < lease_secs.saturating_mul(1000) +enum Value<'a> { + LeaseValue(&'a LeaseValue), + NodeInfo(&'a NodeInfo), +} + +fn build_lease_filter(lease_secs: u64) -> impl Fn(Value) -> bool { + move |value: Value| { + let active_time = match value { + Value::LeaseValue(lease_value) => lease_value.timestamp_millis, + Value::NodeInfo(node_info) => node_info.last_activity_ts, + }; + + ((time_util::current_time_millis() - active_time) as u64) < lease_secs.saturating_mul(1000) } } @@ -91,7 +103,7 @@ pub async fn lookup_datanode_peer( return Ok(None); }; let lease_value: LeaseValue = kv.value.try_into()?; - let is_alive = lease_filter(&lease_value); + let is_alive = lease_filter(Value::LeaseValue(&lease_value)); if is_alive { Ok(Some(Peer { id: lease_key.node_id, @@ -155,7 +167,7 @@ where let condition = this.condition; let key_prefix = std::mem::take(&mut this.key_prefix); let fut = filter(key_prefix, this.meta_peer_client, move |v| { - lease_filter(v) && condition.unwrap_or(|_| true)(v) + lease_filter(Value::LeaseValue(v)) && condition.unwrap_or(|_| true)(v) }); this.inner_future = Some(Box::pin(fut)); @@ -192,7 +204,7 @@ pub async fn lookup_flownode_peer( }; let lease_value: LeaseValue = kv.value.try_into()?; - let is_alive = lease_filter(&lease_value); + let is_alive = lease_filter(Value::LeaseValue(&lease_value)); if is_alive { Ok(Some(Peer { id: lease_key.node_id, @@ -203,6 +215,29 @@ pub async fn lookup_flownode_peer( } } +/// Lookup all alive frontends from the memory backend, only return if it's alive under given `lease_secs`. +pub async fn lookup_frontends( + meta_peer_client: &MetaPeerClientRef, + lease_secs: u64, +) -> Result> { + let range_request = + RangeRequest::new().with_prefix(NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend)); + + let response = meta_peer_client.range(range_request).await?; + let lease_filter = build_lease_filter(lease_secs); + + let mut peers = Vec::with_capacity(response.kvs.len()); + for kv in response.kvs { + let node_info = NodeInfo::try_from(kv.value).context(KvBackendSnafu)?; + let is_alive = lease_filter(Value::NodeInfo(&node_info)); + if is_alive { + peers.push(node_info.peer); + } + } + + Ok(peers) +} + /// Find all alive flownodes pub fn alive_flownodes( meta_peer_client: &MetaPeerClientRef, @@ -264,25 +299,42 @@ impl PeerLookupService for MetaPeerLookupService { .map_err(BoxedError::new) .context(common_meta::error::ExternalSnafu) } + async fn flownode(&self, id: FlownodeId) -> common_meta::error::Result> { lookup_flownode_peer(id, &self.meta_peer_client, u64::MAX) .await .map_err(BoxedError::new) .context(common_meta::error::ExternalSnafu) } + + async fn active_frontends(&self) -> common_meta::error::Result> { + // Get the active frontends within the last heartbeat interval. + lookup_frontends( + &self.meta_peer_client, + // TODO(zyy17): How to get the heartbeat interval of the frontend if it uses a custom heartbeat interval? + FRONTEND_HEARTBEAT_INTERVAL_MILLIS, + ) + .await + .map_err(BoxedError::new) + .context(common_meta::error::ExternalSnafu) + } } #[cfg(test)] mod tests { use api::v1::meta::heartbeat_request::NodeWorkloads; use api::v1::meta::DatanodeWorkloads; + use common_meta::cluster::{FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus}; use common_meta::kv_backend::ResettableKvBackendRef; + use common_meta::peer::Peer; use common_meta::rpc::store::PutRequest; use common_time::util::current_time_millis; use common_workload::DatanodeWorkloadType; use crate::key::{DatanodeLeaseKey, LeaseValue}; - use crate::lease::{alive_datanodes, is_datanode_accept_ingest_workload}; + use crate::lease::{ + alive_datanodes, is_datanode_accept_ingest_workload, lookup_frontends, ClusterRole, + }; use crate::test_util::create_meta_peer_client; async fn put_lease_value( @@ -391,4 +443,60 @@ mod tests { assert_eq!(leases.len(), 1); assert!(leases.contains_key(&DatanodeLeaseKey { node_id: 2 })); } + + #[tokio::test] + async fn test_lookup_frontends() { + let client = create_meta_peer_client(); + let in_memory = client.memory_backend(); + let lease_secs = 10; + + let active_frontend_node = NodeInfo { + peer: Peer { + id: 0, + addr: "127.0.0.1:20201".to_string(), + }, + last_activity_ts: current_time_millis(), + status: NodeStatus::Frontend(FrontendStatus {}), + version: "1.0.0".to_string(), + git_commit: "1234567890".to_string(), + start_time_ms: current_time_millis() as u64, + }; + + let key_prefix = NodeInfoKey::key_prefix_with_role(ClusterRole::Frontend); + + in_memory + .put(PutRequest { + key: format!("{}{}", key_prefix, "0").into(), + value: active_frontend_node.try_into().unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + + let inactive_frontend_node = NodeInfo { + peer: Peer { + id: 1, + addr: "127.0.0.1:20201".to_string(), + }, + last_activity_ts: current_time_millis() - 20 * 1000, + status: NodeStatus::Frontend(FrontendStatus {}), + version: "1.0.0".to_string(), + git_commit: "1234567890".to_string(), + start_time_ms: current_time_millis() as u64, + }; + + in_memory + .put(PutRequest { + key: format!("{}{}", key_prefix, "1").into(), + value: inactive_frontend_node.try_into().unwrap(), + prev_kv: false, + }) + .await + .unwrap(); + + let peers = lookup_frontends(&client, lease_secs as u64).await.unwrap(); + + assert_eq!(peers.len(), 1); + assert_eq!(peers[0].id, 0); + } }