From a533ac2555eaf2b41a27e48e6355138e3230a607 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 24 Apr 2025 10:27:27 +0800 Subject: [PATCH] feat: enhance selector with node exclusion support (#5966) --- src/meta-srv/src/bootstrap.rs | 21 ++++++-- src/meta-srv/src/metasrv/builder.rs | 12 +++-- src/meta-srv/src/node_excluder.rs | 6 +++ src/meta-srv/src/selector.rs | 2 +- src/meta-srv/src/selector/lease_based.rs | 31 ++++++++++- src/meta-srv/src/selector/load_based.rs | 22 ++++++-- src/meta-srv/src/selector/round_robin.rs | 66 ++++++++++++++++++++---- 7 files changed, 136 insertions(+), 24 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 3b27295301..40e41bb815 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -66,10 +66,12 @@ use crate::election::postgres::PgElection; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use crate::election::CANDIDATE_LEASE_SECS; use crate::metasrv::builder::MetasrvBuilder; -use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef}; +use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef}; +use crate::node_excluder::NodeExcluderRef; use crate::selector::lease_based::LeaseBasedSelector; use crate::selector::load_based::LoadBasedSelector; use crate::selector::round_robin::RoundRobinSelector; +use crate::selector::weight_compute::RegionNumsBasedWeightCompute; use crate::selector::SelectorType; use crate::service::admin; use crate::{error, Result}; @@ -294,14 +296,25 @@ pub async fn metasrv_builder( let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef; + let node_excluder = plugins + .get::() + .unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef); let selector = if let Some(selector) = plugins.get::() { info!("Using selector from plugins"); selector } else { let selector = match opts.selector { - SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef, - SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, - SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef, + SelectorType::LoadBased => Arc::new(LoadBasedSelector::new( + RegionNumsBasedWeightCompute, + node_excluder, + )) as SelectorRef, + SelectorType::LeaseBased => { + Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef + } + SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new( + SelectTarget::Datanode, + node_excluder, + )) as SelectorRef, }; info!( "Using selector from options, selector type: {}", diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ec8f6ef253..02f835e226 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -190,7 +190,7 @@ impl MetasrvBuilder { let meta_peer_client = meta_peer_client .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory)); - let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector)); + let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default())); let pushers = Pushers::default(); let mailbox = build_mailbox(&kv_backend, &pushers); let procedure_manager = build_procedure_manager(&options, &kv_backend); @@ -234,13 +234,17 @@ impl MetasrvBuilder { )) }); + let flow_selector = Arc::new(RoundRobinSelector::new( + SelectTarget::Flownode, + Arc::new(Vec::new()), + )) as SelectorRef; + let flow_metadata_allocator = { // for now flownode just use round-robin selector - let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode); let flow_selector_ctx = selector_ctx.clone(); let peer_allocator = Arc::new(FlowPeerAllocator::new( flow_selector_ctx, - Arc::new(flow_selector), + flow_selector.clone(), )); let seq = Arc::new( SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone()) @@ -420,7 +424,7 @@ impl MetasrvBuilder { meta_peer_client: meta_peer_client.clone(), selector, // TODO(jeremy): We do not allow configuring the flow selector. - flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)), + flow_selector, handler_group: RwLock::new(None), handler_group_builder: Mutex::new(Some(handler_group_builder)), election, diff --git a/src/meta-srv/src/node_excluder.rs b/src/meta-srv/src/node_excluder.rs index f9e892f092..a7bc6e0f69 100644 --- a/src/meta-srv/src/node_excluder.rs +++ b/src/meta-srv/src/node_excluder.rs @@ -24,3 +24,9 @@ pub trait NodeExcluder: Send + Sync { /// Returns the excluded datanode ids. fn excluded_datanode_ids(&self) -> &Vec; } + +impl NodeExcluder for Vec { + fn excluded_datanode_ids(&self) -> &Vec { + self + } +} diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index ce166ae05c..96fbda241d 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -18,7 +18,7 @@ pub mod load_based; pub mod round_robin; #[cfg(test)] pub(crate) mod test_utils; -mod weight_compute; +pub mod weight_compute; pub mod weighted_choose; use std::collections::HashSet; diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index e60157989d..448c26b08e 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -12,17 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; +use std::sync::Arc; + use common_meta::peer::Peer; use crate::error::Result; use crate::lease; use crate::metasrv::SelectorContext; +use crate::node_excluder::NodeExcluderRef; use crate::selector::common::{choose_items, filter_out_excluded_peers}; use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem}; use crate::selector::{Selector, SelectorOptions}; /// Select all alive datanodes based using a random weighted choose. -pub struct LeaseBasedSelector; +pub struct LeaseBasedSelector { + node_excluder: NodeExcluderRef, +} + +impl LeaseBasedSelector { + pub fn new(node_excluder: NodeExcluderRef) -> Self { + Self { node_excluder } + } +} + +impl Default for LeaseBasedSelector { + fn default() -> Self { + Self { + node_excluder: Arc::new(Vec::new()), + } + } +} #[async_trait::async_trait] impl Selector for LeaseBasedSelector { @@ -47,7 +67,14 @@ impl Selector for LeaseBasedSelector { .collect(); // 3. choose peers by weight_array. - filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids); + let mut exclude_peer_ids = self + .node_excluder + .excluded_datanode_ids() + .iter() + .cloned() + .collect::>(); + exclude_peer_ids.extend(opts.exclude_peer_ids.iter()); + filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids); let mut weighted_choose = RandomWeightedChoose::new(weight_array); let selected = choose_items(&opts, &mut weighted_choose)?; diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index d98a4ace5d..4f33245a28 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue}; use common_meta::key::TableMetadataManager; @@ -26,6 +27,7 @@ use crate::error::{self, Result}; use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::lease; use crate::metasrv::SelectorContext; +use crate::node_excluder::NodeExcluderRef; use crate::selector::common::{choose_items, filter_out_excluded_peers}; use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute}; use crate::selector::weighted_choose::RandomWeightedChoose; @@ -33,11 +35,15 @@ use crate::selector::{Selector, SelectorOptions}; pub struct LoadBasedSelector { weight_compute: C, + node_excluder: NodeExcluderRef, } impl LoadBasedSelector { - pub fn new(weight_compute: C) -> Self { - Self { weight_compute } + pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self { + Self { + weight_compute, + node_excluder, + } } } @@ -45,6 +51,7 @@ impl Default for LoadBasedSelector { fn default() -> Self { Self { weight_compute: RegionNumsBasedWeightCompute, + node_excluder: Arc::new(Vec::new()), } } } @@ -88,7 +95,14 @@ where let mut weight_array = self.weight_compute.compute(&stat_kvs); // 5. choose peers by weight_array. - filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids); + let mut exclude_peer_ids = self + .node_excluder + .excluded_datanode_ids() + .iter() + .cloned() + .collect::>(); + exclude_peer_ids.extend(opts.exclude_peer_ids.iter()); + filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids); let mut weighted_choose = RandomWeightedChoose::new(weight_array); let selected = choose_items(&opts, &mut weighted_choose)?; diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs index 2c849cb194..d930ca06cb 100644 --- a/src/meta-srv/src/selector/round_robin.rs +++ b/src/meta-srv/src/selector/round_robin.rs @@ -12,7 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::atomic::AtomicUsize; +use std::sync::Arc; use common_meta::peer::Peer; use snafu::ensure; @@ -20,6 +22,7 @@ use snafu::ensure; use crate::error::{NoEnoughAvailableNodeSnafu, Result}; use crate::lease; use crate::metasrv::{SelectTarget, SelectorContext}; +use crate::node_excluder::NodeExcluderRef; use crate::selector::{Selector, SelectorOptions}; /// Round-robin selector that returns the next peer in the list in sequence. @@ -32,6 +35,7 @@ use crate::selector::{Selector, SelectorOptions}; pub struct RoundRobinSelector { select_target: SelectTarget, counter: AtomicUsize, + node_excluder: NodeExcluderRef, } impl Default for RoundRobinSelector { @@ -39,32 +43,38 @@ impl Default for RoundRobinSelector { Self { select_target: SelectTarget::Datanode, counter: AtomicUsize::new(0), + node_excluder: Arc::new(Vec::new()), } } } impl RoundRobinSelector { - pub fn new(select_target: SelectTarget) -> Self { + pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self { Self { select_target, + node_excluder, ..Default::default() } } - async fn get_peers( - &self, - min_required_items: usize, - ctx: &SelectorContext, - ) -> Result> { + async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result> { let mut peers = match self.select_target { SelectTarget::Datanode => { // 1. get alive datanodes. let lease_kvs = lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?; + let mut exclude_peer_ids = self + .node_excluder + .excluded_datanode_ids() + .iter() + .cloned() + .collect::>(); + exclude_peer_ids.extend(opts.exclude_peer_ids.iter()); // 2. map into peers lease_kvs .into_iter() + .filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id)) .map(|(k, v)| Peer::new(k.node_id, v.node_addr)) .collect::>() } @@ -84,8 +94,8 @@ impl RoundRobinSelector { ensure!( !peers.is_empty(), NoEnoughAvailableNodeSnafu { - required: min_required_items, - available: 0usize, + required: opts.min_required_items, + available: peers.len(), select_target: self.select_target } ); @@ -103,7 +113,7 @@ impl Selector for RoundRobinSelector { type Output = Vec; async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result> { - let peers = self.get_peers(opts.min_required_items, ctx).await?; + let peers = self.get_peers(&opts, ctx).await?; // choose peers let mut selected = Vec::with_capacity(opts.min_required_items); for _ in 0..opts.min_required_items { @@ -176,4 +186,42 @@ mod test { assert_eq!(peers.len(), 2); assert_eq!(peers, vec![peer2.clone(), peer3.clone()]); } + + #[tokio::test] + async fn test_round_robin_selector_with_exclude_peer_ids() { + let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5])); + let ctx = create_selector_context(); + // add three nodes + let peer1 = Peer { + id: 2, + addr: "node1".to_string(), + }; + let peer2 = Peer { + id: 5, + addr: "node2".to_string(), + }; + let peer3 = Peer { + id: 8, + addr: "node3".to_string(), + }; + put_datanodes( + &ctx.meta_peer_client, + vec![peer1.clone(), peer2.clone(), peer3.clone()], + ) + .await; + + let peers = selector + .select( + &ctx, + SelectorOptions { + min_required_items: 1, + allow_duplication: true, + exclude_peer_ids: HashSet::from([2]), + }, + ) + .await + .unwrap(); + assert_eq!(peers.len(), 1); + assert_eq!(peers, vec![peer3.clone()]); + } }