refactor: unify frontend discovery with active peer discovery (#8031)

* feat: add MetaClient peer discovery support

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: reuse peer discovery for flow frontend discovery

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: use active frontend peers in process manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2026-04-27 14:40:37 +08:00
committed by GitHub
parent d2d256909f
commit 0c942fc23a
17 changed files with 436 additions and 78 deletions

1
Cargo.lock generated
View File

@@ -7918,6 +7918,7 @@ dependencies = [
"common-meta",
"common-options",
"common-telemetry",
"common-time",
"datatypes",
"futures",
"futures-util",

View File

@@ -633,7 +633,6 @@
| `flow.batching_mode.grpc_conn_timeout` | String | `5s` | The gRPC connection timeout |
| `flow.batching_mode.experimental_grpc_max_retries` | Integer | `3` | The gRPC max retry number |
| `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,<br/>if failed to find available frontend after frontend_scan_timeout elapsed, return error<br/>which prevent flownode from starting |
| `flow.batching_mode.experimental_frontend_activity_timeout` | String | `60s` | Frontend activity timeout<br/>if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,<br/>it will be removed from the list that flownode use to connect |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `flow.batching_mode.read_preference` | String | `Leader` | Read preference of the Frontend client. |

View File

@@ -22,10 +22,6 @@ node_id = 14
## if failed to find available frontend after frontend_scan_timeout elapsed, return error
## which prevent flownode from starting
#+experimental_frontend_scan_timeout="30s"
## Frontend activity timeout
## if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,
## it will be removed from the list that flownode use to connect
#+experimental_frontend_activity_timeout="60s"
## Maximum number of filters allowed in a single query
#+experimental_max_filter_num_per_query=20
## Time window merge distance

View File

@@ -173,7 +173,7 @@ impl ProcessManager {
let mut processes = vec![];
if let Some(remote_frontend_selector) = self.frontend_selector.as_ref() {
let frontends = remote_frontend_selector
.select(|node| node.peer.addr != self.server_addr)
.select(|peer| peer.addr != self.server_addr)
.await
.context(error::InvokeFrontendSnafu)?;
for mut f in frontends {
@@ -211,7 +211,7 @@ impl ProcessManager {
.frontend_selector
.as_ref()
.context(error::MetaClientMissingSnafu)?
.select(|node| node.peer.addr == server_addr)
.select(|peer| peer.addr == server_addr)
.await
.context(error::InvokeFrontendSnafu)?;
ensure!(

View File

@@ -30,7 +30,7 @@ pub enum Error {
#[snafu(display("Failed to list nodes from metasrv"))]
Meta {
source: Box<meta_client::error::Error>,
source: Box<common_meta::error::Error>,
#[snafu(implicit)]
location: Location,
},

View File

@@ -16,7 +16,7 @@ use std::fmt::Debug;
use std::time::Duration;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
use common_meta::peer::{Peer, PeerDiscovery};
use greptime_proto::v1::frontend::{
KillProcessRequest, KillProcessResponse, ListProcessRequest, ListProcessResponse,
frontend_client,
@@ -62,7 +62,7 @@ impl FrontendClient for frontend_client::FrontendClient<tonic::transport::channe
pub trait FrontendSelector {
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
where
F: Fn(&NodeInfo) -> bool + Send;
F: Fn(&Peer) -> bool + Send;
}
#[derive(Debug, Clone)]
@@ -75,22 +75,22 @@ pub struct MetaClientSelector {
impl FrontendSelector for MetaClientSelector {
async fn select<F>(&self, predicate: F) -> Result<Vec<FrontendClientPtr>>
where
F: Fn(&NodeInfo) -> bool + Send,
F: Fn(&Peer) -> bool + Send,
{
let nodes = self
let peers = self
.meta_client
.list_nodes(Some(Role::Frontend))
.active_frontends()
.await
.map_err(Box::new)
.context(MetaSnafu)?;
nodes
peers
.into_iter()
.filter(predicate)
.map(|node| {
.map(|peer| {
let channel = self
.channel_manager
.get(node.peer.addr)
.get(peer.addr)
.map_err(Box::new)
.context(error::CreateChannelSnafu)?;
let client = frontend_client::FrontendClient::new(channel);

View File

@@ -16,7 +16,7 @@ use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest};
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, HeartbeatRequest};
use common_error::ext::ErrorExt;
use lazy_static::lazy_static;
use regex::Regex;
@@ -187,7 +187,11 @@ pub struct FrontendStatus {}
/// The status of a flownode.
#[derive(Debug, Serialize, Deserialize)]
pub struct FlownodeStatus {}
pub struct FlownodeStatus {
/// The workloads of the flownode.
#[serde(default)]
pub workloads: FlownodeWorkloads,
}
/// The status of a metasrv.
#[derive(Debug, Serialize, Deserialize)]
@@ -309,7 +313,7 @@ mod tests {
use super::*;
use crate::cluster::Role::{Datanode, Frontend};
use crate::cluster::{DatanodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
use crate::cluster::{DatanodeStatus, FlownodeStatus, NodeInfo, NodeInfoKey, NodeStatus};
use crate::peer::Peer;
#[test]
@@ -404,4 +408,60 @@ mod tests {
let id4 = calculate_node_id(long_addr);
assert!(id4 > 0);
}
#[test]
fn test_flownode_status_backward_compatible_without_workloads() {
let raw = r#"{
"peer":{"id":1,"addr":"127.0.0.1"},
"last_activity_ts":123,
"status":{"Flownode":{}},
"version":"",
"git_commit":"",
"start_time_ms":1,
"total_cpu_millicores":0,
"total_memory_bytes":0,
"cpu_usage_millicores":0,
"memory_usage_bytes":0,
"hostname":""
}"#;
let node_info: NodeInfo = raw.parse().unwrap();
assert_matches!(
node_info.status,
NodeStatus::Flownode(FlownodeStatus { workloads }) if workloads.types.is_empty()
);
}
#[test]
fn test_flownode_status_round_trip_with_workloads() {
let node_info = NodeInfo {
peer: Peer {
id: 1,
addr: "127.0.0.1".to_string(),
},
last_activity_ts: 123,
status: NodeStatus::Flownode(FlownodeStatus {
workloads: FlownodeWorkloads { types: vec![7] },
}),
version: "".to_string(),
git_commit: "".to_string(),
start_time_ms: 1,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
};
let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
assert_matches!(
new_node_info,
NodeInfo {
status: NodeStatus::Flownode(FlownodeStatus { workloads }),
..
} if workloads.types == vec![7]
);
}
}

View File

@@ -14,7 +14,7 @@
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{DatanodeWorkloads, MailboxMessage};
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, MailboxMessage};
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::warn;
use common_time::util::current_time_millis;
@@ -90,6 +90,16 @@ pub fn get_datanode_workloads(node_workloads: Option<&NodeWorkloads>) -> Datanod
}
}
/// Extracts flownode workloads from the provided optional `NodeWorkloads`.
///
/// Returns empty flownode workloads if the input is `None` or not a flownode payload.
pub fn get_flownode_workloads(node_workloads: Option<&NodeWorkloads>) -> FlownodeWorkloads {
match node_workloads {
Some(NodeWorkloads::Flownode(flownode_workloads)) => flownode_workloads.clone(),
_ => FlownodeWorkloads { types: vec![] },
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -102,4 +112,16 @@ mod tests {
let workloads = get_datanode_workloads(node_workloads.as_ref());
assert_eq!(workloads.types, vec![DatanodeWorkloadType::Hybrid.to_i32()]);
}
#[test]
fn test_get_flownode_workloads() {
let node_workloads = Some(NodeWorkloads::Flownode(FlownodeWorkloads {
types: vec![7],
}));
let workloads = get_flownode_workloads(node_workloads.as_ref());
assert_eq!(workloads.types, vec![7]);
let workloads = get_flownode_workloads(None);
assert!(workloads.types.is_empty());
}
}

View File

@@ -213,7 +213,7 @@ impl FlowDualEngine {
if !frontend_list.is_empty() {
let fe_list = frontend_list
.iter()
.map(|(_, info)| &info.peer.addr)
.map(|peer| &peer.addr)
.collect::<Vec<_>>();
info!("Available frontend found: {:?}", fe_list);
return Ok(());

View File

@@ -48,10 +48,6 @@ pub struct BatchingModeOptions {
/// which prevent flownode from starting
#[serde(with = "humantime_serde")]
pub experimental_frontend_scan_timeout: Duration,
/// Frontend activity timeout
/// if frontend is down(not sending heartbeat) for more than frontend_activity_timeout, it will be removed from the list that flownode use to connect
#[serde(with = "humantime_serde")]
pub experimental_frontend_activity_timeout: Duration,
/// Maximum number of filters allowed in a single query
pub experimental_max_filter_num_per_query: usize,
/// Time window merge distance
@@ -71,7 +67,6 @@ impl Default for BatchingModeOptions {
grpc_conn_timeout: Duration::from_secs(5),
experimental_grpc_max_retries: 3,
experimental_frontend_scan_timeout: Duration::from_secs(30),
experimental_frontend_activity_timeout: Duration::from_secs(60),
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
read_preference: Default::default(),

View File

@@ -16,7 +16,6 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
@@ -24,9 +23,7 @@ use api::v1::{CreateTableExpr, QueryRequest};
use client::{Client, Database};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager, load_client_tls_config};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_meta::peer::{Peer, PeerDiscovery};
use common_query::Output;
use common_telemetry::warn;
use meta_client::client::MetaClient;
@@ -200,38 +197,19 @@ impl DatabaseWithPeer {
impl FrontendClient {
/// scan for available frontend from metadata
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
pub(crate) async fn scan_for_frontend(&self) -> Result<Vec<Peer>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
let cluster_client = meta_client
.cluster_client()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let prefix = NodeInfoKey::key_prefix_with_role(Role::Frontend);
let req = RangeRequest::new().with_prefix(prefix);
let resp = cluster_client
.range(req)
meta_client
.active_frontends()
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut res = Vec::with_capacity(resp.kvs.len());
for kv in resp.kvs {
let key = NodeInfoKey::try_from(kv.key)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let val = NodeInfo::try_from(kv.value)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
res.push((key, val));
}
Ok(res)
.context(ExternalSnafu)
}
/// Get the frontend with recent enough(less than 1 minute from now) `last_activity_ts`
/// and is able to process query
/// Get a frontend discovered by metasrv and verified with a query probe.
async fn get_random_active_frontend(
&self,
catalog: &str,
@@ -255,26 +233,11 @@ impl FrontendClient {
interval.tick().await;
for retry in 0..batch_opts.experimental_grpc_max_retries {
let mut frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
// shuffle the frontends to avoid always pick the same one
frontends.shuffle(&mut rng());
// found node with maximum last_activity_ts
for (_, node_info) in frontends
.iter()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
node_info.last_activity_ts
+ batch_opts
.experimental_frontend_activity_timeout
.as_millis() as i64
> now_in_ms
})
{
let addr = &node_info.peer.addr;
for peer in frontends {
let addr = peer.addr.clone();
let client = Client::with_manager_and_urls(chnl_mgr.clone(), vec![addr.clone()]);
let database = {
let mut db = Database::new(catalog, schema, client);
@@ -283,7 +246,7 @@ impl FrontendClient {
}
db
};
let db = DatabaseWithPeer::new(database, node_info.peer.clone());
let db = DatabaseWithPeer::new(database, peer);
match db.try_select_one().await {
Ok(_) => return Ok(db),
Err(e) => {

View File

@@ -17,6 +17,7 @@ common-macro.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
futures.workspace = true
futures-util.workspace = true
humantime-serde.workspace = true

View File

@@ -26,6 +26,7 @@ use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{
MetasrvNodeInfo, ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role,
};
@@ -38,11 +39,13 @@ use common_meta::cluster::{
ClusterInfo, MetasrvStatus, NodeInfo, NodeInfoKey, NodeStatus, Role as ClusterRole,
};
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue, RegionStat};
use common_meta::distributed_time_constants::default_distributed_time_constants;
use common_meta::error::{
self as meta_error, ExternalSnafu, Result as MetaResult, UnsupportedSnafu,
};
use common_meta::key::flow::flow_state::{FlowStat, FlowStateManager};
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::{Peer, PeerDiscovery};
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::range_stream::PaginationStream;
use common_meta::rpc::KeyValue;
@@ -59,6 +62,7 @@ use common_meta::rpc::store::{
};
use common_options::plugin_options::PluginOptionsDeserializer;
use common_telemetry::info;
use common_time::util::DefaultSystemTimer;
use config::Client as ConfigClient;
use futures::TryStreamExt;
use heartbeat::{Client as HeartbeatClient, HeartbeatConfig};
@@ -553,6 +557,60 @@ impl ClusterInfo for MetaClient {
}
}
// TODO(weny): the discovery using client side timestamp may be inaccurate,
// maybe we need to use the timestamp from metasrv in the future.
#[async_trait::async_trait]
impl PeerDiscovery for MetaClient {
async fn active_frontends(&self) -> MetaResult<Vec<Peer>> {
let nodes = self
.list_nodes(Some(ClusterRole::Frontend))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(util::alive_frontends(
&DefaultSystemTimer,
nodes,
// TODO(weny): the heartbeat interval should be received from metasrv
// instead of using the default value.
default_distributed_time_constants().frontend_heartbeat_interval,
))
}
async fn active_datanodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> MetaResult<Vec<Peer>> {
let nodes = self
.list_nodes(Some(ClusterRole::Datanode))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(util::alive_datanodes(
&DefaultSystemTimer,
nodes,
default_distributed_time_constants().datanode_lease,
filter,
))
}
async fn active_flownodes(
&self,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> MetaResult<Vec<Peer>> {
let nodes = self
.list_nodes(Some(ClusterRole::Flownode))
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
Ok(util::alive_flownodes(
&DefaultSystemTimer,
nodes,
default_distributed_time_constants().flownode_lease,
filter,
))
}
}
fn decode_stats(kv: KeyValue) -> MetaResult<DatanodeStatValue> {
DatanodeStatValue::try_from(kv.value)
.map_err(BoxedError::new)

View File

@@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{ErrorCode, ResponseHeader};
use common_meta::cluster::{NodeInfo, NodeStatus};
use common_meta::peer::Peer;
use common_time::util::SystemTimer;
use tonic::{Code, Status};
pub(crate) fn is_unreachable(status: &Status) -> bool {
@@ -30,3 +34,254 @@ pub(crate) fn is_not_leader(header: &Option<ResponseHeader>) -> bool {
err.code == ErrorCode::NotLeader as i32
}
fn is_active_node(
timer: &impl SystemTimer,
last_activity_ts: i64,
active_duration: std::time::Duration,
) -> bool {
let now = timer.current_time_millis();
let elapsed = now.checked_sub(last_activity_ts).unwrap_or(0) as u64;
elapsed < active_duration.as_millis() as u64
}
pub(crate) fn alive_frontends(
timer: &impl SystemTimer,
nodes: Vec<NodeInfo>,
active_duration: std::time::Duration,
) -> Vec<Peer> {
nodes
.into_iter()
.filter_map(|node| {
if matches!(node.status, NodeStatus::Frontend(_))
&& is_active_node(timer, node.last_activity_ts, active_duration)
{
Some(node.peer)
} else {
None
}
})
.collect()
}
pub(crate) fn alive_datanodes(
timer: &impl SystemTimer,
nodes: Vec<NodeInfo>,
active_duration: std::time::Duration,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Vec<Peer> {
let filter = filter.unwrap_or(|_| true);
nodes
.into_iter()
.filter_map(|node| {
if let NodeStatus::Datanode(status) = node.status
&& is_active_node(timer, node.last_activity_ts, active_duration)
{
let workloads = NodeWorkloads::Datanode(status.workloads);
filter(&workloads).then_some(node.peer)
} else {
None
}
})
.collect()
}
pub(crate) fn alive_flownodes(
timer: &impl SystemTimer,
nodes: Vec<NodeInfo>,
active_duration: std::time::Duration,
filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
) -> Vec<Peer> {
let filter = filter.unwrap_or(|_| true);
nodes
.into_iter()
.filter_map(|node| {
if let NodeStatus::Flownode(status) = node.status
&& is_active_node(timer, node.last_activity_ts, active_duration)
{
let workloads = NodeWorkloads::Flownode(status.workloads);
filter(&workloads).then_some(node.peer)
} else {
None
}
})
.collect()
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{DatanodeWorkloads, FlownodeWorkloads, Peer};
use common_meta::cluster::{
DatanodeStatus, FlownodeStatus, FrontendStatus, MetasrvStatus, NodeInfo, NodeStatus, Role,
};
use common_time::util::SystemTimer;
use super::*;
struct MockSystemTimer(i64);
impl MockSystemTimer {
fn new(now: i64) -> Self {
Self(now)
}
}
impl SystemTimer for MockSystemTimer {
fn current_time_millis(&self) -> i64 {
self.0
}
fn current_time_rfc3339(&self) -> String {
"1970-01-01T00:00:00Z".to_string()
}
}
fn node_info(role: Role, id: u64, addr: &str, last_activity_ts: i64) -> NodeInfo {
let status = match role {
Role::Frontend => NodeStatus::Frontend(FrontendStatus {}),
Role::Datanode => NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads: DatanodeWorkloads { types: vec![] },
}),
Role::Flownode => NodeStatus::Flownode(FlownodeStatus {
workloads: FlownodeWorkloads { types: vec![] },
}),
Role::Metasrv => NodeStatus::Metasrv(MetasrvStatus { is_leader: false }),
};
NodeInfo {
peer: Peer::new(id, addr),
last_activity_ts,
status,
version: String::new(),
git_commit: String::new(),
start_time_ms: 0,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: String::new(),
}
}
fn ingest_only(workloads: &NodeWorkloads) -> bool {
matches!(
workloads,
NodeWorkloads::Datanode(DatanodeWorkloads { types }) if types.as_slice() == [1]
)
}
fn empty_flownode_workloads(workloads: &NodeWorkloads) -> bool {
matches!(
workloads,
NodeWorkloads::Flownode(FlownodeWorkloads { types }) if types.is_empty()
)
}
#[test]
fn test_alive_frontends_filters_by_activity_and_role() {
let timer = MockSystemTimer::new(100);
let peers = alive_frontends(
&timer,
vec![
node_info(Role::Frontend, 1, "127.0.0.1:3001", 95),
node_info(Role::Frontend, 2, "127.0.0.1:3002", 89),
node_info(Role::Datanode, 3, "127.0.0.1:4001", 99),
],
Duration::from_millis(10),
);
assert_eq!(
vec![1],
peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
);
}
#[test]
fn test_alive_datanodes_filters_by_activity_and_workload() {
let timer = MockSystemTimer::new(100);
let mut first = node_info(Role::Datanode, 1, "127.0.0.1:4001", 95);
let mut second = node_info(Role::Datanode, 2, "127.0.0.1:4002", 95);
let stale = node_info(Role::Datanode, 3, "127.0.0.1:4003", 89);
if let NodeStatus::Datanode(status) = &mut first.status {
status.workloads = DatanodeWorkloads { types: vec![1] };
}
if let NodeStatus::Datanode(status) = &mut second.status {
status.workloads = DatanodeWorkloads { types: vec![2] };
}
let peers = alive_datanodes(
&timer,
vec![first, second, stale],
Duration::from_millis(10),
Some(ingest_only),
);
assert_eq!(
vec![1],
peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
);
}
#[test]
fn test_alive_flownodes_uses_empty_workload_semantics() {
let timer = MockSystemTimer::new(100);
let peers = alive_flownodes(
&timer,
vec![
node_info(Role::Flownode, 1, "127.0.0.1:5001", 95),
node_info(Role::Flownode, 2, "127.0.0.1:5002", 89),
node_info(Role::Frontend, 3, "127.0.0.1:3001", 99),
],
Duration::from_millis(10),
Some(empty_flownode_workloads),
);
assert_eq!(
vec![1],
peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
);
}
#[test]
fn test_alive_flownodes_filters_by_workloads() {
let timer = MockSystemTimer::new(100);
let mut first = node_info(Role::Flownode, 1, "127.0.0.1:5001", 95);
let mut second = node_info(Role::Flownode, 2, "127.0.0.1:5002", 95);
if let NodeStatus::Flownode(status) = &mut first.status {
status.workloads = FlownodeWorkloads { types: vec![7] };
}
if let NodeStatus::Flownode(status) = &mut second.status {
status.workloads = FlownodeWorkloads { types: vec![8] };
}
fn workload_type_is_7(workloads: &NodeWorkloads) -> bool {
matches!(
workloads,
NodeWorkloads::Flownode(FlownodeWorkloads { types }) if types.as_slice() == [7]
)
}
let peers = alive_flownodes(
&timer,
vec![first, second],
Duration::from_millis(10),
Some(workload_type_is_7),
);
assert_eq!(
vec![1],
peers.into_iter().map(|peer| peer.id).collect::<Vec<_>>()
);
}
}

View File

@@ -16,6 +16,7 @@ use api::v1::meta::{HeartbeatRequest, NodeInfo as PbNodeInfo, Role};
use common_meta::cluster::{
DatanodeStatus, FlownodeStatus, FrontendStatus, NodeInfo, NodeInfoKey, NodeStatus,
};
use common_meta::heartbeat::utils::get_flownode_workloads;
use common_meta::peer::Peer;
use common_meta::rpc::store::PutRequest;
use snafu::ResultExt;
@@ -82,11 +83,14 @@ impl HeartbeatHandler for CollectFlownodeClusterInfoHandler {
let Some((key, peer, info)) = extract_base_info(req) else {
return Ok(HandleControl::Continue);
};
let flownode_workloads = get_flownode_workloads(req.node_workloads.as_ref());
let value = NodeInfo {
peer,
last_activity_ts: common_time::util::current_time_millis(),
status: NodeStatus::Flownode(FlownodeStatus {}),
status: NodeStatus::Flownode(FlownodeStatus {
workloads: flownode_workloads,
}),
version: info.version,
git_commit: info.git_commit,
start_time_ms: info.start_time_ms,

View File

@@ -13,8 +13,8 @@
// limitations under the License.
use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{FlownodeWorkloads, HeartbeatRequest, Peer, Role};
use common_meta::heartbeat::utils::get_datanode_workloads;
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use common_meta::heartbeat::utils::{get_datanode_workloads, get_flownode_workloads};
use common_meta::rpc::store::PutRequest;
use common_telemetry::{trace, warn};
use common_time::util as time_util;
@@ -85,7 +85,12 @@ impl HeartbeatHandler for FlownodeKeepLeaseHandler {
ctx: &mut Context,
_acc: &mut HeartbeatAccumulator,
) -> Result<HandleControl> {
let HeartbeatRequest { header, peer, .. } = req;
let HeartbeatRequest {
header,
peer,
node_workloads,
..
} = req;
let Some(_header) = &header else {
return Ok(HandleControl::Continue);
};
@@ -97,7 +102,7 @@ impl HeartbeatHandler for FlownodeKeepLeaseHandler {
let value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: peer.addr.clone(),
workloads: NodeWorkloads::Flownode(FlownodeWorkloads { types: vec![] }),
workloads: NodeWorkloads::Flownode(get_flownode_workloads(node_workloads.as_ref())),
};
trace!("Receive a heartbeat from flownode: {key:?}, {value:?}");

View File

@@ -1537,7 +1537,6 @@ experimental_min_refresh_duration = "5s"
grpc_conn_timeout = "5s"
experimental_grpc_max_retries = 3
experimental_frontend_scan_timeout = "30s"
experimental_frontend_activity_timeout = "1m"
experimental_max_filter_num_per_query = 20
experimental_time_window_merge_threshold = 3
read_preference = "Leader"