diff --git a/src/meta-srv/src/discovery/lease.rs b/src/meta-srv/src/discovery/lease.rs index 8f078d5ddf..557437964e 100644 --- a/src/meta-srv/src/discovery/lease.rs +++ b/src/meta-srv/src/discovery/lease.rs @@ -107,7 +107,7 @@ mod tests { use common_time::util::current_time_millis; use common_workload::DatanodeWorkloadType; - use crate::discovery::utils::{self, is_datanode_accept_ingest_workload}; + use crate::discovery::utils::{self, accept_ingest_workload}; use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::test_util::create_meta_peer_client; @@ -219,7 +219,7 @@ mod tests { let peers = utils::alive_datanodes( client.as_ref(), Duration::from_secs(lease_secs), - Some(is_datanode_accept_ingest_workload), + Some(accept_ingest_workload), ) .await .unwrap(); diff --git a/src/meta-srv/src/discovery/utils.rs b/src/meta-srv/src/discovery/utils.rs index ace6adc5da..9a8ec7a82c 100644 --- a/src/meta-srv/src/discovery/utils.rs +++ b/src/meta-srv/src/discovery/utils.rs @@ -144,19 +144,22 @@ pub async fn alive_datanode( Ok(v) } -/// Returns true if the datanode can accept ingest workload based on its workload types. +/// Determines if a datanode is capable of accepting ingest workloads. +/// Returns `true` if the datanode's workload types include ingest capability, +/// or if the node is not of type [NodeWorkloads::Datanode]. /// /// A datanode is considered to accept ingest workload if it supports either: /// - Hybrid workload (both ingest and query workloads) /// - Ingest workload (only ingest workload) -pub fn is_datanode_accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool { +pub fn accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool { match &datanode_workloads { NodeWorkloads::Datanode(workloads) => workloads .types .iter() .filter_map(|w| DatanodeWorkloadType::from_i32(*w)) .any(|w| w.accept_ingest()), - _ => false, + // If the [NodeWorkloads] type is not [NodeWorkloads::Datanode], returns true. + _ => true, } } diff --git a/src/meta-srv/src/peer.rs b/src/meta-srv/src/peer.rs index f7ea784548..2672a239e0 100644 --- a/src/meta-srv/src/peer.rs +++ b/src/meta-srv/src/peer.rs @@ -20,6 +20,7 @@ use common_meta::error::{ExternalSnafu, Result as MetaResult}; use common_meta::peer::{Peer, PeerAllocator}; use snafu::{ResultExt, ensure}; +use crate::discovery::utils::accept_ingest_workload; use crate::error::{Result, TooManyPartitionsSnafu}; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::selector::SelectorOptions; @@ -69,6 +70,7 @@ impl MetasrvPeerAllocator { min_required_items, allow_duplication: true, exclude_peer_ids: HashSet::new(), + workload_filter: Some(accept_ingest_workload), }, ) .await diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 48cc7ae092..cb198b6787 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -40,6 +40,7 @@ use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::oneshot; use tokio::time::{MissedTickBehavior, interval, interval_at}; +use crate::discovery::utils::accept_ingest_workload; use crate::error::{self, Result}; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef}; @@ -584,6 +585,7 @@ impl RegionSupervisor { min_required_items: regions.len(), allow_duplication: true, exclude_peer_ids, + workload_filter: Some(accept_ingest_workload), }; let peers = selector.select(&self.selector_context, opt).await?; ensure!( diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 92e55a2149..2562c6bfb7 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -22,6 +22,7 @@ pub mod weight_compute; pub mod weighted_choose; use std::collections::HashSet; +use api::v1::meta::heartbeat_request::NodeWorkloads; use serde::{Deserialize, Serialize}; use store_api::storage::RegionId; use strum::AsRefStr; @@ -63,6 +64,8 @@ pub struct SelectorOptions { pub allow_duplication: bool, /// The peers to exclude from the selection. pub exclude_peer_ids: HashSet, + /// The filter to select the peers based on their workloads. + pub workload_filter: Option bool>, } impl Default for SelectorOptions { @@ -71,6 +74,7 @@ impl Default for SelectorOptions { min_required_items: 1, allow_duplication: false, exclude_peer_ids: HashSet::new(), + workload_filter: None, } } } diff --git a/src/meta-srv/src/selector/common.rs b/src/meta-srv/src/selector/common.rs index 341b9106d6..70da22aedd 100644 --- a/src/meta-srv/src/selector/common.rs +++ b/src/meta-srv/src/selector/common.rs @@ -139,6 +139,7 @@ mod tests { min_required_items: i, allow_duplication: false, exclude_peer_ids: HashSet::new(), + workload_filter: None, }; let selected_peers: HashSet<_> = @@ -154,6 +155,7 @@ mod tests { min_required_items: 6, allow_duplication: false, exclude_peer_ids: HashSet::new(), + workload_filter: None, }; let selected_result = @@ -165,6 +167,7 @@ mod tests { min_required_items: i, allow_duplication: true, exclude_peer_ids: HashSet::new(), + workload_filter: None, }; let selected_peers = diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index c91aeeee04..ad5993696c 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -15,7 +15,6 @@ use common_meta::peer::Peer; use snafu::ResultExt; -use crate::discovery::utils::is_datanode_accept_ingest_workload; use crate::error::{ListActiveDatanodesSnafu, Result}; use crate::metasrv::SelectorContext; use crate::selector::common::{choose_items, filter_out_excluded_peers}; @@ -35,7 +34,7 @@ impl Selector for LeaseBasedSelector { // 1. get alive datanodes. let alive_datanodes = ctx .peer_discovery - .active_datanodes(Some(is_datanode_accept_ingest_workload)) + .active_datanodes(opts.workload_filter) .await .context(ListActiveDatanodesSnafu)?; diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index b08e52ede8..02c8581522 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -20,7 +20,6 @@ use common_telemetry::debug; use snafu::ResultExt; use crate::cluster::MetaPeerClientRef; -use crate::discovery::utils::is_datanode_accept_ingest_workload; use crate::error::{ListActiveDatanodesSnafu, Result}; use crate::metasrv::SelectorContext; use crate::selector::common::{choose_items, filter_out_excluded_peers}; @@ -54,7 +53,7 @@ where // 1. get alive datanodes. let alive_datanodes = ctx .peer_discovery - .active_datanodes(Some(is_datanode_accept_ingest_workload)) + .active_datanodes(opts.workload_filter) .await .context(ListActiveDatanodesSnafu)?; diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs index 853173b874..0f8727fba6 100644 --- a/src/meta-srv/src/selector/round_robin.rs +++ b/src/meta-srv/src/selector/round_robin.rs @@ -17,7 +17,6 @@ use std::sync::atomic::AtomicUsize; use common_meta::peer::Peer; use snafu::{ResultExt, ensure}; -use crate::discovery::utils::is_datanode_accept_ingest_workload; use crate::error::{ ListActiveDatanodesSnafu, ListActiveFlownodesSnafu, NoEnoughAvailableNodeSnafu, Result, }; @@ -59,7 +58,7 @@ impl RoundRobinSelector { // 1. get alive datanodes. let alive_datanodes = ctx .peer_discovery - .active_datanodes(Some(is_datanode_accept_ingest_workload)) + .active_datanodes(opts.workload_filter) .await .context(ListActiveDatanodesSnafu)?; @@ -71,7 +70,7 @@ impl RoundRobinSelector { } SelectTarget::Flownode => ctx .peer_discovery - .active_flownodes(None) + .active_flownodes(opts.workload_filter) .await .context(ListActiveFlownodesSnafu)?, }; @@ -150,6 +149,7 @@ mod test { min_required_items: 4, allow_duplication: true, exclude_peer_ids: HashSet::new(), + workload_filter: None, }, ) .await @@ -167,6 +167,7 @@ mod test { min_required_items: 2, allow_duplication: true, exclude_peer_ids: HashSet::new(), + workload_filter: None, }, ) .await @@ -208,6 +209,7 @@ mod test { min_required_items: 1, allow_duplication: true, exclude_peer_ids: HashSet::from([2, 5]), + workload_filter: None, }, ) .await