feat: add exclude_peer_ids to SelectorOptions (#5949)

* feat: add `exclude_peer_ids` to `SelectorOptions`

* chore: apply suggestions from CR

* fix: clippy
This commit is contained in:
Weny Xu
2025-04-22 15:49:22 +08:00
committed by GitHub
parent 0a4594c9e2
commit 0f77135ef9
9 changed files with 71 additions and 8 deletions

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use common_error::ext::BoxedError;
use common_meta::ddl::flow_meta::PartitionPeerAllocator;
use common_meta::peer::Peer;
@@ -40,6 +42,7 @@ impl PartitionPeerAllocator for FlowPeerAllocator {
SelectorOptions {
min_required_items: partitions,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
},
)
.await

View File

@@ -58,6 +58,9 @@ use crate::error::{self, Result};
use crate::metrics::{METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE};
use crate::service::mailbox::MailboxRef;
/// The default timeout for region migration.
pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
/// It's shared in each step and available even after recovering.
///
/// It will only be updated/stored after the Red node has succeeded.

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;
@@ -36,7 +37,9 @@ use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::region_migration::RegionMigrationProcedureTask;
use crate::procedure::region_migration::{
RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
};
use crate::region::failure_detector::RegionFailureDetector;
use crate::selector::SelectorOptions;
@@ -401,6 +404,7 @@ impl RegionSupervisor {
SelectorOptions {
min_required_items: 1,
allow_duplication: false,
exclude_peer_ids: HashSet::from([from_peer.id]),
},
)
.await?;
@@ -415,7 +419,7 @@ impl RegionSupervisor {
region_id,
from_peer,
to_peer,
timeout: Duration::from_secs(60),
timeout: DEFAULT_REGION_MIGRATION_TIMEOUT,
};
if let Err(err) = self.region_migration_manager.submit_procedure(task).await {

View File

@@ -20,6 +20,8 @@ pub mod round_robin;
pub(crate) mod test_utils;
mod weight_compute;
pub mod weighted_choose;
use std::collections::HashSet;
use serde::{Deserialize, Serialize};
use strum::AsRefStr;
@@ -40,6 +42,8 @@ pub struct SelectorOptions {
pub min_required_items: usize,
/// Whether duplicates are allowed in the selected result, default false.
pub allow_duplication: bool,
/// The peers to exclude from the selection.
pub exclude_peer_ids: HashSet<u64>,
}
impl Default for SelectorOptions {
@@ -47,6 +51,7 @@ impl Default for SelectorOptions {
Self {
min_required_items: 1,
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
}
}
}

View File

@@ -12,15 +12,25 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use common_meta::peer::Peer;
use snafu::ensure;
use crate::error;
use crate::error::Result;
use crate::metasrv::SelectTarget;
use crate::selector::weighted_choose::WeightedChoose;
use crate::selector::weighted_choose::{WeightedChoose, WeightedItem};
use crate::selector::SelectorOptions;
/// Filter out the excluded peers from the `weight_array`.
pub fn filter_out_excluded_peers(
weight_array: &mut Vec<WeightedItem<Peer>>,
exclude_peer_ids: &HashSet<u64>,
) {
weight_array.retain(|peer| !exclude_peer_ids.contains(&peer.item.id));
}
/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`.
pub fn choose_items<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
where
@@ -80,7 +90,7 @@ mod tests {
use common_meta::peer::Peer;
use crate::selector::common::choose_items;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::SelectorOptions;
@@ -128,6 +138,7 @@ mod tests {
let opts = SelectorOptions {
min_required_items: i,
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
};
let selected_peers: HashSet<_> =
@@ -142,6 +153,7 @@ mod tests {
let opts = SelectorOptions {
min_required_items: 6,
allow_duplication: false,
exclude_peer_ids: HashSet::new(),
};
let selected_result =
@@ -152,6 +164,7 @@ mod tests {
let opts = SelectorOptions {
min_required_items: i,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
};
let selected_peers =
@@ -160,4 +173,30 @@ mod tests {
assert_eq!(i, selected_peers.len());
}
}
#[test]
fn test_filter_out_excluded_peers() {
let mut weight_array = vec![
WeightedItem {
item: Peer {
id: 1,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1.0,
},
WeightedItem {
item: Peer {
id: 2,
addr: "127.0.0.1:3002".to_string(),
},
weight: 1.0,
},
];
let exclude_peer_ids = HashSet::from([1]);
filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
assert_eq!(weight_array.len(), 1);
assert_eq!(weight_array[0].item.id, 2);
}
}

View File

@@ -17,7 +17,7 @@ use common_meta::peer::Peer;
use crate::error::Result;
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::common::choose_items;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::{Selector, SelectorOptions};
@@ -35,7 +35,7 @@ impl Selector for LeaseBasedSelector {
lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?;
// 2. compute weight array, but the weight of each item is the same.
let weight_array = lease_kvs
let mut weight_array = lease_kvs
.into_iter()
.map(|(k, v)| WeightedItem {
item: Peer {
@@ -47,6 +47,7 @@ impl Selector for LeaseBasedSelector {
.collect();
// 3. choose peers by weight_array.
filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_items(&opts, &mut weighted_choose)?;

View File

@@ -26,7 +26,7 @@ use crate::error::{self, Result};
use crate::key::{DatanodeLeaseKey, LeaseValue};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::common::choose_items;
use crate::selector::common::{choose_items, filter_out_excluded_peers};
use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::selector::weighted_choose::RandomWeightedChoose;
use crate::selector::{Selector, SelectorOptions};
@@ -85,9 +85,10 @@ where
};
// 4. compute weight array.
let weight_array = self.weight_compute.compute(&stat_kvs);
let mut weight_array = self.weight_compute.compute(&stat_kvs);
// 5. choose peers by weight_array.
filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_items(&opts, &mut weighted_choose)?;

View File

@@ -120,6 +120,8 @@ impl Selector for RoundRobinSelector {
#[cfg(test)]
mod test {
use std::collections::HashSet;
use super::*;
use crate::test_util::{create_selector_context, put_datanodes};
@@ -149,6 +151,7 @@ mod test {
SelectorOptions {
min_required_items: 4,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
},
)
.await
@@ -165,6 +168,7 @@ mod test {
SelectorOptions {
min_required_items: 2,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
},
)
.await

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::ddl::table_meta::PeerAllocator;
@@ -51,6 +53,7 @@ impl MetasrvPeerAllocator {
SelectorOptions {
min_required_items: regions,
allow_duplication: true,
exclude_peer_ids: HashSet::new(),
},
)
.await?;