diff --git a/Cargo.lock b/Cargo.lock
index f72fb8fd0d..4c53d78e2e 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7918,6 +7918,7 @@ dependencies = [
"common-meta",
"common-options",
"common-telemetry",
+ "common-time",
"datatypes",
"futures",
"futures-util",
diff --git a/config/config.md b/config/config.md
index 0a106cbeea..01cf9e75fa 100644
--- a/config/config.md
+++ b/config/config.md
@@ -633,7 +633,6 @@
| `flow.batching_mode.grpc_conn_timeout` | String | `5s` | The gRPC connection timeout |
| `flow.batching_mode.experimental_grpc_max_retries` | Integer | `3` | The gRPC max retry number |
| `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,
if failed to find available frontend after frontend_scan_timeout elapsed, return error
which prevent flownode from starting |
-| `flow.batching_mode.experimental_frontend_activity_timeout` | String | `60s` | Frontend activity timeout
if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,
it will be removed from the list that flownode use to connect |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |
diff --git a/config/flownode.example.toml b/config/flownode.example.toml
index b13acfc447..5519de1b89 100644
--- a/config/flownode.example.toml
+++ b/config/flownode.example.toml
@@ -22,10 +22,6 @@ node_id = 14
## if failed to find available frontend after frontend_scan_timeout elapsed, return error
## which prevent flownode from starting
#+experimental_frontend_scan_timeout="30s"
-## Frontend activity timeout
-## if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,
-## it will be removed from the list that flownode use to connect
-#+experimental_frontend_activity_timeout="60s"
## Maximum number of filters allowed in a single query
#+experimental_max_filter_num_per_query=20
## Time window merge distance
diff --git a/src/catalog/src/process_manager.rs b/src/catalog/src/process_manager.rs
index 6a3ae31b25..49be14933c 100644
--- a/src/catalog/src/process_manager.rs
+++ b/src/catalog/src/process_manager.rs
@@ -173,7 +173,7 @@ impl ProcessManager {
let mut processes = vec![];
if let Some(remote_frontend_selector) = self.frontend_selector.as_ref() {
let frontends = remote_frontend_selector
- .select(|node| node.peer.addr != self.server_addr)
+ .select(|peer| peer.addr != self.server_addr)
.await
.context(error::InvokeFrontendSnafu)?;
for mut f in frontends {
@@ -211,7 +211,7 @@ impl ProcessManager {
.frontend_selector
.as_ref()
.context(error::MetaClientMissingSnafu)?
- .select(|node| node.peer.addr == server_addr)
+ .select(|peer| peer.addr == server_addr)
.await
.context(error::InvokeFrontendSnafu)?;
ensure!(
diff --git a/src/common/frontend/src/error.rs b/src/common/frontend/src/error.rs
index 429489326c..19fb29aeb7 100644
--- a/src/common/frontend/src/error.rs
+++ b/src/common/frontend/src/error.rs
@@ -30,7 +30,7 @@ pub enum Error {
#[snafu(display("Failed to list nodes from metasrv"))]
Meta {
- source: Box,
+ source: Box,
#[snafu(implicit)]
location: Location,
},
diff --git a/src/common/frontend/src/selector.rs b/src/common/frontend/src/selector.rs
index 5bbd8bb52c..bba36ef81b 100644
--- a/src/common/frontend/src/selector.rs
+++ b/src/common/frontend/src/selector.rs
@@ -16,7 +16,7 @@ use std::fmt::Debug;
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
-use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
+use common_meta::peer::{Peer, PeerDiscovery};
use greptime_proto::v1::frontend::{
KillProcessRequest, KillProcessResponse, ListProcessRequest, ListProcessResponse,
frontend_client,
@@ -62,7 +62,7 @@ impl FrontendClient for frontend_client::FrontendClient(&self, predicate: F) -> Result>
where
- F: Fn(&NodeInfo) -> bool + Send;
+ F: Fn(&Peer) -> bool + Send;
}
#[derive(Debug, Clone)]
@@ -75,22 +75,22 @@ pub struct MetaClientSelector {
impl FrontendSelector for MetaClientSelector {
async fn select(&self, predicate: F) -> Result>
where
- F: Fn(&NodeInfo) -> bool + Send,
+ F: Fn(&Peer) -> bool + Send,
{
- let nodes = self
+ let peers = self
.meta_client
- .list_nodes(Some(Role::Frontend))
+ .active_frontends()
.await
.map_err(Box::new)
.context(MetaSnafu)?;
- nodes
+ peers
.into_iter()
.filter(predicate)
- .map(|node| {
+ .map(|peer| {
let channel = self
.channel_manager
- .get(node.peer.addr)
+ .get(peer.addr)
.map_err(Box::new)
.context(error::CreateChannelSnafu)?;
let client = frontend_client::FrontendClient::new(channel);
diff --git a/src/common/meta/src/cluster.rs b/src/common/meta/src/cluster.rs
index 527ad589f1..40ee8b0e2f 100644
--- a/src/common/meta/src/cluster.rs
+++ b/src/common/meta/src/cluster.rs
@@ -16,7 +16,7 @@ use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;
-use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
+use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, HeartbeatRequest};
use common_error::ext::ErrorExt;
use lazy_static::lazy_static;
use regex::Regex;
@@ -187,7 +187,11 @@ pub struct FrontendStatus {}
/// The status of a flownode.
#[derive(Debug, Serialize, Deserialize)]
-pub struct FlownodeStatus {}
+pub struct FlownodeStatus {
+ /// The workloads of the flownode.
+ #[serde(default)]
+ pub workloads: FlownodeWorkloads,
+}
/// The status of a metasrv.
#[derive(Debug, Serialize, Deserialize)]
@@ -309,7 +313,7 @@ mod tests {
use super::*;
use crate::cluster::Role::{Datanode, Frontend};
- use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
+ use crate::cluster::{DatanodeStatus, FlownodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
use crate::peer::Peer;
#[test]
@@ -404,4 +408,60 @@ mod tests {
let id4 = calculate_node_id(long_addr);
assert!(id4 > 0);
}
+
+ #[test]
+ fn test_flownode_status_backward_compatible_without_workloads() {
+ let raw = r#"{
+ "peer":{"id":1,"addr":"127.0.0.1"},
+ "last_activity_ts":123,
+ "status":{"Flownode":{}},
+ "version":"",
+ "git_commit":"",
+ "start_time_ms":1,
+ "total_cpu_millicores":0,
+ "total_memory_bytes":0,
+ "cpu_usage_millicores":0,
+ "memory_usage_bytes":0,
+ "hostname":""
+ }"#;
+
+ let node_info: NodeInfo = raw.parse().unwrap();
+ assert_matches!(
+ node_info.status,
+ NodeStatus::Flownode(FlownodeStatus { workloads }) if workloads.types.is_empty()
+ );
+ }
+
+ #[test]
+ fn test_flownode_status_round_trip_with_workloads() {
+ let node_info = NodeInfo {
+ peer: Peer {
+ id: 1,
+ addr: "127.0.0.1".to_string(),
+ },
+ last_activity_ts: 123,
+ status: NodeStatus::Flownode(FlownodeStatus {
+ workloads: FlownodeWorkloads { types: vec![7] },
+ }),
+ version: "".to_string(),
+ git_commit: "".to_string(),
+ start_time_ms: 1,
+ total_cpu_millicores: 0,
+ total_memory_bytes: 0,
+ cpu_usage_millicores: 0,
+ memory_usage_bytes: 0,
+ hostname: "test_hostname".to_string(),
+ };
+
+ let node_info_bytes: Vec = node_info.try_into().unwrap();
+ let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
+
+ assert_matches!(
+ new_node_info,
+ NodeInfo {
+ status: NodeStatus::Flownode(FlownodeStatus { workloads }),
+ ..
+ } if workloads.types == vec![7]
+ );
+ }
}
diff --git a/src/common/meta/src/heartbeat/utils.rs b/src/common/meta/src/heartbeat/utils.rs
index c037f6306f..f1c3eec1af 100644
--- a/src/common/meta/src/heartbeat/utils.rs
+++ b/src/common/meta/src/heartbeat/utils.rs
@@ -14,7 +14,7 @@
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::mailbox_message::Payload;
-use api::v1::meta::{DatanodeWorkloads, MailboxMessage};
+use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, MailboxMessage};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use common_time::util::current_time_millis;
@@ -90,6 +90,16 @@ pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> Datanod
}
}
+/// Extracts flownode workloads from the provided optional `NodeWorkloads`.
+///
+/// Returns empty flownode workloads if the input is `None` or not a flownode payload.
+pub fn get_flownode_workloads(node_workloads: Option<&NodeWorkloads>) -> FlownodeWorkloads {
+ match node_workloads {
+ Some(NodeWorkloads::Flownode(flownode_workloads)) => flownode_workloads.clone(),
+ _ => FlownodeWorkloads { types: vec![] },
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -102,4 +112,16 @@ mod tests {
let workloads = get_datanode_workloads(node_workloads.as_ref());
assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
}
+
+ #[test]
+ fn test_get_flownode_workloads() {
+ let node_workloads = Some(NodeWorkloads::Flownode(FlownodeWorkloads {
+ types: vec![7],
+ }));
+ let workloads = get_flownode_workloads(node_workloads.as_ref());
+ assert_eq!(workloads.types, vec![7]);
+
+ let workloads = get_flownode_workloads(None);
+ assert!(workloads.types.is_empty());
+ }
}
diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs
index 976df56c9e..53a3265d7d 100644
--- a/src/flow/src/adapter/flownode_impl.rs
+++ b/src/flow/src/adapter/flownode_impl.rs
@@ -213,7 +213,7 @@ impl FlowDualEngine {
if !frontend_list.is_empty() {
let fe_list = frontend_list
.iter()
- .map(|(_, info)| &info.peer.addr)
+ .map(|peer| &peer.addr)
.collect::>();
info!("Available frontend found: {:?}", fe_list);
return Ok(());
diff --git a/src/flow/src/batching_mode.rs b/src/flow/src/batching_mode.rs
index 7817cfb6d0..4162daa20c 100644
--- a/src/flow/src/batching_mode.rs
+++ b/src/flow/src/batching_mode.rs
@@ -48,10 +48,6 @@ pub struct BatchingModeOptions {
/// which prevent flownode from starting
#[serde(with = "humantime_serde")]
pub experimental_frontend_scan_timeout: Duration,
- /// Frontend activity timeout
- /// if frontend is down(not sending heartbeat) for more than frontend_activity_timeout, it will be removed from the list that flownode use to connect
- #[serde(with = "humantime_serde")]
- pub experimental_frontend_activity_timeout: Duration,
/// Maximum number of filters allowed in a single query
pub experimental_max_filter_num_per_query: usize,
/// Time window merge distance
@@ -71,7 +67,6 @@ impl Default for BatchingModeOptions {
grpc_conn_timeout: Duration::from_secs(5),
experimental_grpc_max_retries: 3,
experimental_frontend_scan_timeout: Duration::from_secs(30),
- experimental_frontend_activity_timeout: Duration::from_secs(60),
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
read_preference: Default::default(),
diff --git a/src/flow/src/batching_mode/frontend_client.rs b/src/flow/src/batching_mode/frontend_client.rs
index 8ec6de3a08..9875564c78 100644
--- a/src/flow/src/batching_mode/frontend_client.rs
+++ b/src/flow/src/batching_mode/frontend_client.rs
@@ -16,7 +16,6 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
-use std::time::SystemTime;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
@@ -24,9 +23,7 @@ use api::v1::{CreateTableExpr, QueryRequest};
use client::{Client, Database};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
-use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
-use common_meta::peer::Peer;
-use common_meta::rpc::store::RangeRequest;
+use common_meta::peer::{Peer, PeerDiscovery};
use common_query::Output;
use common_telemetry::warn;
use meta_client::client::MetaClient;
@@ -200,38 +197,19 @@ impl DatabaseWithPeer {
impl FrontendClient {
/// scan for available frontend from metadata
- pub(crate) async fn scan_for_frontend(&self) -> Result, Error> {
+ pub(crate) async fn scan_for_frontend(&self) -> Result, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
- let cluster_client = meta_client
- .cluster_client()
- .map_err(BoxedError::new)
- .context(ExternalSnafu)?;
- let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
- let req = RangeRequest::new().with_prefix(prefix);
- let resp = cluster_client
- .range(req)
+ meta_client
+ .active_frontends()
.await
.map_err(BoxedError::new)
- .context(ExternalSnafu)?;
- let mut res = Vec::with_capacity(resp.kvs.len());
- for kv in resp.kvs {
- let key = NodeInfoKey::try_from(kv.key)
- .map_err(BoxedError::new)
- .context(ExternalSnafu)?;
-
- let val = NodeInfo::try_from(kv.value)
- .map_err(BoxedError::new)
- .context(ExternalSnafu)?;
- res.push((key, val));
- }
- Ok(res)
+ .context(ExternalSnafu)
}
- /// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
- /// and is able to process query
+ /// Get a frontend discovered by metasrv and verified with a query probe.
async fn get_random_active_frontend(
&self,
catalog: &str,
@@ -255,26 +233,11 @@ impl FrontendClient {
interval.tick().await;
for retry in 0..batch_opts.experimental_grpc_max_retries {
let mut frontends = self.scan_for_frontend().await?;
- let now_in_ms = SystemTime::now()
- .duration_since(SystemTime::UNIX_EPOCH)
- .unwrap()
- .as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
- // found node with maximum last_activity_ts
- for (_, node_info) in frontends
- .iter()
- // filter out frontend that have been down for more than 1 min
- .filter(|(_, node_info)| {
- node_info.last_activity_ts
- + batch_opts
- .experimental_frontend_activity_timeout
- .as_millis() as i64
- > now_in_ms
- })
- {
- let addr = &node_info.peer.addr;
+ for peer in frontends {
+ let addr = peer.addr.clone();
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
let database = {
let mut db = Database::new(catalog, schema, client);
@@ -283,7 +246,7 @@ impl FrontendClient {
}
db
};
- let db = DatabaseWithPeer::new(database, node_info.peer.clone());
+ let db = DatabaseWithPeer::new(database, peer);
match db.try_select_one().await {
Ok(_) => return Ok(db),
Err(e) => {
diff --git a/src/meta-client/Cargo.toml b/src/meta-client/Cargo.toml
index 26066c7b96..28c03b3f0d 100644
--- a/src/meta-client/Cargo.toml
+++ b/src/meta-client/Cargo.toml
@@ -17,6 +17,7 @@ common-macro.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-telemetry.workspace = true
+common-time.workspace = true
futures.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true
diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs
index 4a79105615..205226c845 100644
--- a/src/meta-client/src/client.rs
+++ b/src/meta-client/src/client.rs
@@ -26,6 +26,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
+use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{
MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
};
@@ -38,11 +39,13 @@ use common_meta::cluster::{
ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
};
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
+use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::error::{
self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
};
use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
use common_meta::kv_backend::KvBackendRef;
+use common_meta::peer::{Peer, PeerDiscovery};
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::KeyValue;
@@ -59,6 +62,7 @@ use common_meta::rpc::store::{
};
use common_options::plugin_options::PluginOptionsDeserializer;
use common_telemetry::info;
+use common_time::util::DefaultSystemTimer;
use config::Client as ConfigClient;
use futures::TryStreamExt;
use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
@@ -553,6 +557,60 @@ impl ClusterInfo for MetaClient {
}
}
+// TODO(weny): the discovery using client side timestamp may be inaccurate,
+// maybe we need to use the timestamp from metasrv in the future.
+#[async_trait::async_trait]
+impl PeerDiscovery for MetaClient {
+ async fn active_frontends(&self) -> MetaResult> {
+ let nodes = self
+ .list_nodes(Some(ClusterRole::Frontend))
+ .await
+ .map_err(BoxedError::new)
+ .context(ExternalSnafu)?;
+ Ok(util::alive_frontends(
+ &DefaultSystemTimer,
+ nodes,
+ // TODO(weny): the heartbeat interval should be received from metasrv
+ // instead of using the default value.
+ default_distributed_time_constants().frontend_heartbeat_interval,
+ ))
+ }
+
+ async fn active_datanodes(
+ &self,
+ filter: Option fn(&'a NodeWorkloads) -> bool>,
+ ) -> MetaResult> {
+ let nodes = self
+ .list_nodes(Some(ClusterRole::Datanode))
+ .await
+ .map_err(BoxedError::new)
+ .context(ExternalSnafu)?;
+ Ok(util::alive_datanodes(
+ &DefaultSystemTimer,
+ nodes,
+ default_distributed_time_constants().datanode_lease,
+ filter,
+ ))
+ }
+
+ async fn active_flownodes(
+ &self,
+ filter: Option fn(&'a NodeWorkloads) -> bool>,
+ ) -> MetaResult> {
+ let nodes = self
+ .list_nodes(Some(ClusterRole::Flownode))
+ .await
+ .map_err(BoxedError::new)
+ .context(ExternalSnafu)?;
+ Ok(util::alive_flownodes(
+ &DefaultSystemTimer,
+ nodes,
+ default_distributed_time_constants().flownode_lease,
+ filter,
+ ))
+ }
+}
+
fn decode_stats(kv: KeyValue) -> MetaResult {
DatanodeStatValue::try_from(kv.value)
.map_err(BoxedError::new)
diff --git a/src/meta-client/src/client/util.rs b/src/meta-client/src/client/util.rs
index 758e1f9de1..13f2135039 100644
--- a/src/meta-client/src/client/util.rs
+++ b/src/meta-client/src/client/util.rs
@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{ErrorCode, ResponseHeader};
+use common_meta::cluster::{NodeInfo, NodeStatus};
+use common_meta::peer::Peer;
+use common_time::util::SystemTimer;
use tonic::{Code, Status};
pub(crate) fn is_unreachable(status: &Status) -> bool {
@@ -30,3 +34,254 @@ pub(crate) fn is_not_leader(header: &Option) -> bool {
err.code == ErrorCode::NotLeader as i32
}
+
+fn is_active_node(
+ timer: &impl SystemTimer,
+ last_activity_ts: i64,
+ active_duration: std::time::Duration,
+) -> bool {
+ let now = timer.current_time_millis();
+ let elapsed = now.checked_sub(last_activity_ts).unwrap_or(0) as u64;
+ elapsed < active_duration.as_millis() as u64
+}
+
+pub(crate) fn alive_frontends(
+ timer: &impl SystemTimer,
+ nodes: Vec,
+ active_duration: std::time::Duration,
+) -> Vec {
+ nodes
+ .into_iter()
+ .filter_map(|node| {
+ if matches!(node.status, NodeStatus::Frontend(_))
+ && is_active_node(timer, node.last_activity_ts, active_duration)
+ {
+ Some(node.peer)
+ } else {
+ None
+ }
+ })
+ .collect()
+}
+
+pub(crate) fn alive_datanodes(
+ timer: &impl SystemTimer,
+ nodes: Vec,
+ active_duration: std::time::Duration,
+ filter: Option fn(&'a NodeWorkloads) -> bool>,
+) -> Vec {
+ let filter = filter.unwrap_or(|_| true);
+
+ nodes
+ .into_iter()
+ .filter_map(|node| {
+ if let NodeStatus::Datanode(status) = node.status
+ && is_active_node(timer, node.last_activity_ts, active_duration)
+ {
+ let workloads = NodeWorkloads::Datanode(status.workloads);
+ filter(&workloads).then_some(node.peer)
+ } else {
+ None
+ }
+ })
+ .collect()
+}
+
+pub(crate) fn alive_flownodes(
+ timer: &impl SystemTimer,
+ nodes: Vec,
+ active_duration: std::time::Duration,
+ filter: Option fn(&'a NodeWorkloads) -> bool>,
+) -> Vec {
+ let filter = filter.unwrap_or(|_| true);
+
+ nodes
+ .into_iter()
+ .filter_map(|node| {
+ if let NodeStatus::Flownode(status) = node.status
+ && is_active_node(timer, node.last_activity_ts, active_duration)
+ {
+ let workloads = NodeWorkloads::Flownode(status.workloads);
+ filter(&workloads).then_some(node.peer)
+ } else {
+ None
+ }
+ })
+ .collect()
+}
+
+#[cfg(test)]
+mod tests {
+ use std::time::Duration;
+
+ use api::v1::meta::heartbeat_request::NodeWorkloads;
+ use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, Peer};
+ use common_meta::cluster::{
+ DatanodeStatus, FlownodeStatus, FrontendStatus, MetasrvStatus, NodeInfo, NodeStatus, Role,
+ };
+ use common_time::util::SystemTimer;
+
+ use super::*;
+
+ struct MockSystemTimer(i64);
+
+ impl MockSystemTimer {
+ fn new(now: i64) -> Self {
+ Self(now)
+ }
+ }
+
+ impl SystemTimer for MockSystemTimer {
+ fn current_time_millis(&self) -> i64 {
+ self.0
+ }
+
+ fn current_time_rfc3339(&self) -> String {
+ "1970-01-01T00:00:00Z".to_string()
+ }
+ }
+
+ fn node_info(role: Role, id: u64, addr: &str, last_activity_ts: i64) -> NodeInfo {
+ let status = match role {
+ Role::Frontend => NodeStatus::Frontend(FrontendStatus {}),
+ Role::Datanode => NodeStatus::Datanode(DatanodeStatus {
+ rcus: 0,
+ wcus: 0,
+ leader_regions: 0,
+ follower_regions: 0,
+ workloads: DatanodeWorkloads { types: vec![] },
+ }),
+ Role::Flownode => NodeStatus::Flownode(FlownodeStatus {
+ workloads: FlownodeWorkloads { types: vec![] },
+ }),
+ Role::Metasrv => NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
+ };
+
+ NodeInfo {
+ peer: Peer::new(id, addr),
+ last_activity_ts,
+ status,
+ version: String::new(),
+ git_commit: String::new(),
+ start_time_ms: 0,
+ total_cpu_millicores: 0,
+ total_memory_bytes: 0,
+ cpu_usage_millicores: 0,
+ memory_usage_bytes: 0,
+ hostname: String::new(),
+ }
+ }
+
+ fn ingest_only(workloads: &NodeWorkloads) -> bool {
+ matches!(
+ workloads,
+ NodeWorkloads::Datanode(DatanodeWorkloads { types }) if types.as_slice() == [1]
+ )
+ }
+
+ fn empty_flownode_workloads(workloads: &NodeWorkloads) -> bool {
+ matches!(
+ workloads,
+ NodeWorkloads::Flownode(FlownodeWorkloads { types }) if types.is_empty()
+ )
+ }
+
+ #[test]
+ fn test_alive_frontends_filters_by_activity_and_role() {
+ let timer = MockSystemTimer::new(100);
+ let peers = alive_frontends(
+ &timer,
+ vec![
+ node_info(Role::Frontend, 1, "127.0.0.1:3001", 95),
+ node_info(Role::Frontend, 2, "127.0.0.1:3002", 89),
+ node_info(Role::Datanode, 3, "127.0.0.1:4001", 99),
+ ],
+ Duration::from_millis(10),
+ );
+
+ assert_eq!(
+ vec![1],
+ peers.into_iter().map(|peer| peer.id).collect::>()
+ );
+ }
+
+ #[test]
+ fn test_alive_datanodes_filters_by_activity_and_workload() {
+ let timer = MockSystemTimer::new(100);
+ let mut first = node_info(Role::Datanode, 1, "127.0.0.1:4001", 95);
+ let mut second = node_info(Role::Datanode, 2, "127.0.0.1:4002", 95);
+ let stale = node_info(Role::Datanode, 3, "127.0.0.1:4003", 89);
+
+ if let NodeStatus::Datanode(status) = &mut first.status {
+ status.workloads = DatanodeWorkloads { types: vec![1] };
+ }
+ if let NodeStatus::Datanode(status) = &mut second.status {
+ status.workloads = DatanodeWorkloads { types: vec![2] };
+ }
+
+ let peers = alive_datanodes(
+ &timer,
+ vec![first, second, stale],
+ Duration::from_millis(10),
+ Some(ingest_only),
+ );
+
+ assert_eq!(
+ vec![1],
+ peers.into_iter().map(|peer| peer.id).collect::>()
+ );
+ }
+
+ #[test]
+ fn test_alive_flownodes_uses_empty_workload_semantics() {
+ let timer = MockSystemTimer::new(100);
+ let peers = alive_flownodes(
+ &timer,
+ vec![
+ node_info(Role::Flownode, 1, "127.0.0.1:5001", 95),
+ node_info(Role::Flownode, 2, "127.0.0.1:5002", 89),
+ node_info(Role::Frontend, 3, "127.0.0.1:3001", 99),
+ ],
+ Duration::from_millis(10),
+ Some(empty_flownode_workloads),
+ );
+
+ assert_eq!(
+ vec![1],
+ peers.into_iter().map(|peer| peer.id).collect::>()
+ );
+ }
+
+ #[test]
+ fn test_alive_flownodes_filters_by_workloads() {
+ let timer = MockSystemTimer::new(100);
+ let mut first = node_info(Role::Flownode, 1, "127.0.0.1:5001", 95);
+ let mut second = node_info(Role::Flownode, 2, "127.0.0.1:5002", 95);
+
+ if let NodeStatus::Flownode(status) = &mut first.status {
+ status.workloads = FlownodeWorkloads { types: vec![7] };
+ }
+ if let NodeStatus::Flownode(status) = &mut second.status {
+ status.workloads = FlownodeWorkloads { types: vec![8] };
+ }
+
+ fn workload_type_is_7(workloads: &NodeWorkloads) -> bool {
+ matches!(
+ workloads,
+ NodeWorkloads::Flownode(FlownodeWorkloads { types }) if types.as_slice() == [7]
+ )
+ }
+
+ let peers = alive_flownodes(
+ &timer,
+ vec![first, second],
+ Duration::from_millis(10),
+ Some(workload_type_is_7),
+ );
+
+ assert_eq!(
+ vec![1],
+ peers.into_iter().map(|peer| peer.id).collect::>()
+ );
+ }
+}
diff --git a/src/meta-srv/src/handler/collect_cluster_info_handler.rs b/src/meta-srv/src/handler/collect_cluster_info_handler.rs
index 3fc785a1cb..09286c5836 100644
--- a/src/meta-srv/src/handler/collect_cluster_info_handler.rs
+++ b/src/meta-srv/src/handler/collect_cluster_info_handler.rs
@@ -16,6 +16,7 @@ use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
use common_meta::cluster::{
DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
};
+use common_meta::heartbeat::utils::get_flownode_workloads;
use common_meta::peer::Peer;
use common_meta::rpc::store::PutRequest;
use snafu::ResultExt;
@@ -82,11 +83,14 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
let Some((key, peer, info)) = extract_base_info(req) else {
return Ok(HandleControl::Continue);
};
+ let flownode_workloads = get_flownode_workloads(req.node_workloads.as_ref());
let value = NodeInfo {
peer,
last_activity_ts: common_time::util::current_time_millis(),
- status: NodeStatus::Flownode(FlownodeStatus {}),
+ status: NodeStatus::Flownode(FlownodeStatus {
+ workloads: flownode_workloads,
+ }),
version: info.version,
git_commit: info.git_commit,
start_time_ms: info.start_time_ms,
diff --git a/src/meta-srv/src/handler/keep_lease_handler.rs b/src/meta-srv/src/handler/keep_lease_handler.rs
index 45570a831d..426ff545c9 100644
--- a/src/meta-srv/src/handler/keep_lease_handler.rs
+++ b/src/meta-srv/src/handler/keep_lease_handler.rs
@@ -13,8 +13,8 @@
// limitations under the License.
use api::v1::meta::heartbeat_request::NodeWorkloads;
-use api::v1::meta::{FlownodeWorkloads, HeartbeatRequest, Peer, Role};
-use common_meta::heartbeat::utils::get_datanode_workloads;
+use api::v1::meta::{HeartbeatRequest, Peer, Role};
+use common_meta::heartbeat::utils::{get_datanode_workloads, get_flownode_workloads};
use common_meta::rpc::store::PutRequest;
use common_telemetry::{trace, warn};
use common_time::util as time_util;
@@ -85,7 +85,12 @@ impl HeartbeatHandler for FlownodeKeepLeaseHandler {
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result {
- let HeartbeatRequest { header, peer, .. } = req;
+ let HeartbeatRequest {
+ header,
+ peer,
+ node_workloads,
+ ..
+ } = req;
let Some(_header) = &header else {
return Ok(HandleControl::Continue);
};
@@ -97,7 +102,7 @@ impl HeartbeatHandler for FlownodeKeepLeaseHandler {
let value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: peer.addr.clone(),
- workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
+ workloads: NodeWorkloads::Flownode(get_flownode_workloads(node_workloads.as_ref())),
};
trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index a195e85e35..657a9447e7 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1537,7 +1537,6 @@ experimental_min_refresh_duration = "5s"
grpc_conn_timeout = "5s"
experimental_grpc_max_retries = 3
experimental_frontend_scan_timeout = "30s"
-experimental_frontend_activity_timeout = "1m"
experimental_max_filter_num_per_query = 20
experimental_time_window_merge_threshold = 3
read_preference = "Leader"