From 0f77135ef9a33f8d9dd5d70ac99d84a0a04ccc31 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 22 Apr 2025 15:49:22 +0800 Subject: [PATCH] feat: add `exclude_peer_ids` to `SelectorOptions` (#5949) * feat: add `exclude_peer_ids` to `SelectorOptions` * chore: apply suggestions from CR * fix: clippy --- src/meta-srv/src/flow_meta_alloc.rs | 3 ++ .../src/procedure/region_migration.rs | 3 ++ src/meta-srv/src/region/supervisor.rs | 8 +++- src/meta-srv/src/selector.rs | 5 +++ src/meta-srv/src/selector/common.rs | 43 ++++++++++++++++++- src/meta-srv/src/selector/lease_based.rs | 5 ++- src/meta-srv/src/selector/load_based.rs | 5 ++- src/meta-srv/src/selector/round_robin.rs | 4 ++ src/meta-srv/src/table_meta_alloc.rs | 3 ++ 9 files changed, 71 insertions(+), 8 deletions(-) diff --git a/src/meta-srv/src/flow_meta_alloc.rs b/src/meta-srv/src/flow_meta_alloc.rs index bdfac158aa..5c9dd82301 100644 --- a/src/meta-srv/src/flow_meta_alloc.rs +++ b/src/meta-srv/src/flow_meta_alloc.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use common_error::ext::BoxedError; use common_meta::ddl::flow_meta::PartitionPeerAllocator; use common_meta::peer::Peer; @@ -40,6 +42,7 @@ impl PartitionPeerAllocator for FlowPeerAllocator { SelectorOptions { min_required_items: partitions, allow_duplication: true, + exclude_peer_ids: HashSet::new(), }, ) .await diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index a98320f9e7..b2f1eed711 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -58,6 +58,9 @@ use crate::error::{self, Result}; use crate::metrics::{METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE}; use crate::service::mailbox::MailboxRef; +/// The default timeout for region migration. +pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120); + /// It's shared in each step and available even after recovering. /// /// It will only be updated/stored after the Red node has succeeded. diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index f37face68f..6318c0d128 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::fmt::Debug; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -36,7 +37,9 @@ use crate::error::{self, Result}; use crate::failure_detector::PhiAccrualFailureDetectorOptions; use crate::metasrv::{SelectorContext, SelectorRef}; use crate::procedure::region_migration::manager::RegionMigrationManagerRef; -use crate::procedure::region_migration::RegionMigrationProcedureTask; +use crate::procedure::region_migration::{ + RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT, +}; use crate::region::failure_detector::RegionFailureDetector; use crate::selector::SelectorOptions; @@ -401,6 +404,7 @@ impl RegionSupervisor { SelectorOptions { min_required_items: 1, allow_duplication: false, + exclude_peer_ids: HashSet::from([from_peer.id]), }, ) .await?; @@ -415,7 +419,7 @@ impl RegionSupervisor { region_id, from_peer, to_peer, - timeout: Duration::from_secs(60), + timeout: DEFAULT_REGION_MIGRATION_TIMEOUT, }; if let Err(err) = self.region_migration_manager.submit_procedure(task).await { diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index 9a3d2ceadd..ce166ae05c 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -20,6 +20,8 @@ pub mod round_robin; pub(crate) mod test_utils; mod weight_compute; pub mod weighted_choose; +use std::collections::HashSet; + use serde::{Deserialize, Serialize}; use strum::AsRefStr; @@ -40,6 +42,8 @@ pub struct SelectorOptions { pub min_required_items: usize, /// Whether duplicates are allowed in the selected result, default false. pub allow_duplication: bool, + /// The peers to exclude from the selection. + pub exclude_peer_ids: HashSet, } impl Default for SelectorOptions { @@ -47,6 +51,7 @@ impl Default for SelectorOptions { Self { min_required_items: 1, allow_duplication: false, + exclude_peer_ids: HashSet::new(), } } } diff --git a/src/meta-srv/src/selector/common.rs b/src/meta-srv/src/selector/common.rs index b44043f60e..24c3597c26 100644 --- a/src/meta-srv/src/selector/common.rs +++ b/src/meta-srv/src/selector/common.rs @@ -12,15 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use common_meta::peer::Peer; use snafu::ensure; use crate::error; use crate::error::Result; use crate::metasrv::SelectTarget; -use crate::selector::weighted_choose::WeightedChoose; +use crate::selector::weighted_choose::{WeightedChoose, WeightedItem}; use crate::selector::SelectorOptions; +/// Filter out the excluded peers from the `weight_array`. +pub fn filter_out_excluded_peers( + weight_array: &mut Vec>, + exclude_peer_ids: &HashSet, +) { + weight_array.retain(|peer| !exclude_peer_ids.contains(&peer.item.id)); +} + /// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`. pub fn choose_items(opts: &SelectorOptions, weighted_choose: &mut W) -> Result> where @@ -80,7 +90,7 @@ mod tests { use common_meta::peer::Peer; - use crate::selector::common::choose_items; + use crate::selector::common::{choose_items, filter_out_excluded_peers}; use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem}; use crate::selector::SelectorOptions; @@ -128,6 +138,7 @@ mod tests { let opts = SelectorOptions { min_required_items: i, allow_duplication: false, + exclude_peer_ids: HashSet::new(), }; let selected_peers: HashSet<_> = @@ -142,6 +153,7 @@ mod tests { let opts = SelectorOptions { min_required_items: 6, allow_duplication: false, + exclude_peer_ids: HashSet::new(), }; let selected_result = @@ -152,6 +164,7 @@ mod tests { let opts = SelectorOptions { min_required_items: i, allow_duplication: true, + exclude_peer_ids: HashSet::new(), }; let selected_peers = @@ -160,4 +173,30 @@ mod tests { assert_eq!(i, selected_peers.len()); } } + + #[test] + fn test_filter_out_excluded_peers() { + let mut weight_array = vec![ + WeightedItem { + item: Peer { + id: 1, + addr: "127.0.0.1:3001".to_string(), + }, + weight: 1.0, + }, + WeightedItem { + item: Peer { + id: 2, + addr: "127.0.0.1:3002".to_string(), + }, + weight: 1.0, + }, + ]; + + let exclude_peer_ids = HashSet::from([1]); + filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids); + + assert_eq!(weight_array.len(), 1); + assert_eq!(weight_array[0].item.id, 2); + } } diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index 770c8aac8a..e60157989d 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -17,7 +17,7 @@ use common_meta::peer::Peer; use crate::error::Result; use crate::lease; use crate::metasrv::SelectorContext; -use crate::selector::common::choose_items; +use crate::selector::common::{choose_items, filter_out_excluded_peers}; use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem}; use crate::selector::{Selector, SelectorOptions}; @@ -35,7 +35,7 @@ impl Selector for LeaseBasedSelector { lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?; // 2. compute weight array, but the weight of each item is the same. - let weight_array = lease_kvs + let mut weight_array = lease_kvs .into_iter() .map(|(k, v)| WeightedItem { item: Peer { @@ -47,6 +47,7 @@ impl Selector for LeaseBasedSelector { .collect(); // 3. choose peers by weight_array. + filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids); let mut weighted_choose = RandomWeightedChoose::new(weight_array); let selected = choose_items(&opts, &mut weighted_choose)?; diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 2628990bf4..d98a4ace5d 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -26,7 +26,7 @@ use crate::error::{self, Result}; use crate::key::{DatanodeLeaseKey, LeaseValue}; use crate::lease; use crate::metasrv::SelectorContext; -use crate::selector::common::choose_items; +use crate::selector::common::{choose_items, filter_out_excluded_peers}; use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute}; use crate::selector::weighted_choose::RandomWeightedChoose; use crate::selector::{Selector, SelectorOptions}; @@ -85,9 +85,10 @@ where }; // 4. compute weight array. - let weight_array = self.weight_compute.compute(&stat_kvs); + let mut weight_array = self.weight_compute.compute(&stat_kvs); // 5. choose peers by weight_array. + filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids); let mut weighted_choose = RandomWeightedChoose::new(weight_array); let selected = choose_items(&opts, &mut weighted_choose)?; diff --git a/src/meta-srv/src/selector/round_robin.rs b/src/meta-srv/src/selector/round_robin.rs index f11a36555f..2c849cb194 100644 --- a/src/meta-srv/src/selector/round_robin.rs +++ b/src/meta-srv/src/selector/round_robin.rs @@ -120,6 +120,8 @@ impl Selector for RoundRobinSelector { #[cfg(test)] mod test { + use std::collections::HashSet; + use super::*; use crate::test_util::{create_selector_context, put_datanodes}; @@ -149,6 +151,7 @@ mod test { SelectorOptions { min_required_items: 4, allow_duplication: true, + exclude_peer_ids: HashSet::new(), }, ) .await @@ -165,6 +168,7 @@ mod test { SelectorOptions { min_required_items: 2, allow_duplication: true, + exclude_peer_ids: HashSet::new(), }, ) .await diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index 8578e6cd19..54dd34ea86 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; + use async_trait::async_trait; use common_error::ext::BoxedError; use common_meta::ddl::table_meta::PeerAllocator; @@ -51,6 +53,7 @@ impl MetasrvPeerAllocator { SelectorOptions { min_required_items: regions, allow_duplication: true, + exclude_peer_ids: HashSet::new(), }, ) .await?;