From 7a8e1bc3f993a109fcd47d1a3cb74df26846e291 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 21 Apr 2025 18:59:24 +0800 Subject: [PATCH] feat: support building `metasrv` with selector from plugins (#5942) * chore: expose selector * feat: use f64 * chore: expose selector::common * feat: build metasrv with selector from plugins --- src/meta-srv/src/bootstrap.rs | 18 ++++++++++++++---- src/meta-srv/src/selector.rs | 7 ++++--- src/meta-srv/src/selector/common.rs | 10 +++++----- src/meta-srv/src/selector/lease_based.rs | 2 +- src/meta-srv/src/selector/weight_compute.rs | 4 ++-- src/meta-srv/src/selector/weighted_choose.rs | 17 ++++++++++------- 6 files changed, 36 insertions(+), 22 deletions(-) diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 33c32443e2..3b27295301 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -294,10 +294,20 @@ pub async fn metasrv_builder( let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef; - let selector = match opts.selector { - SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef, - SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, - SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef, + let selector = if let Some(selector) = plugins.get::() { + info!("Using selector from plugins"); + selector + } else { + let selector = match opts.selector { + SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef, + SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef, + SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef, + }; + info!( + "Using selector from options, selector type: {}", + opts.selector.as_ref() + ); + selector }; Ok(MetasrvBuilder::new() diff --git a/src/meta-srv/src/selector.rs b/src/meta-srv/src/selector.rs index c197f04e59..9a3d2ceadd 100644 --- a/src/meta-srv/src/selector.rs +++ b/src/meta-srv/src/selector.rs @@ -12,15 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod common; +pub mod common; pub mod lease_based; pub mod load_based; pub mod round_robin; #[cfg(test)] pub(crate) mod test_utils; mod weight_compute; -mod weighted_choose; +pub mod weighted_choose; use serde::{Deserialize, Serialize}; +use strum::AsRefStr; use crate::error; use crate::error::Result; @@ -51,7 +52,7 @@ impl Default for SelectorOptions { } /// [`SelectorType`] refers to the load balancer used when creating tables. -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default, AsRefStr)] #[serde(try_from = "String")] pub enum SelectorType { /// The current load balancing is based on the number of regions on each datanode node; diff --git a/src/meta-srv/src/selector/common.rs b/src/meta-srv/src/selector/common.rs index 24352c14c2..b44043f60e 100644 --- a/src/meta-srv/src/selector/common.rs +++ b/src/meta-srv/src/selector/common.rs @@ -92,35 +92,35 @@ mod tests { id: 1, addr: "127.0.0.1:3001".to_string(), }, - weight: 1, + weight: 1.0, }, WeightedItem { item: Peer { id: 2, addr: "127.0.0.1:3001".to_string(), }, - weight: 1, + weight: 1.0, }, WeightedItem { item: Peer { id: 3, addr: "127.0.0.1:3001".to_string(), }, - weight: 1, + weight: 1.0, }, WeightedItem { item: Peer { id: 4, addr: "127.0.0.1:3001".to_string(), }, - weight: 1, + weight: 1.0, }, WeightedItem { item: Peer { id: 5, addr: "127.0.0.1:3001".to_string(), }, - weight: 1, + weight: 1.0, }, ]; diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index a7ce7c7321..770c8aac8a 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -42,7 +42,7 @@ impl Selector for LeaseBasedSelector { id: k.node_id, addr: v.node_addr.clone(), }, - weight: 1, + weight: 1.0, }) .collect(); diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index 904fc1f4e7..d69e2f56d3 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -84,7 +84,7 @@ impl WeightCompute for RegionNumsBasedWeightCompute { .zip(region_nums) .map(|(peer, region_num)| WeightedItem { item: peer, - weight: (max_weight - region_num + base_weight) as usize, + weight: (max_weight - region_num + base_weight) as f64, }) .collect() } @@ -148,7 +148,7 @@ mod tests { 2, ); for weight in weight_array.iter() { - assert_eq!(*expected.get(&weight.item).unwrap(), weight.weight,); + assert_eq!(*expected.get(&weight.item).unwrap(), weight.weight as usize); } let mut expected = HashMap::new(); diff --git a/src/meta-srv/src/selector/weighted_choose.rs b/src/meta-srv/src/selector/weighted_choose.rs index 749df57edf..0890be9137 100644 --- a/src/meta-srv/src/selector/weighted_choose.rs +++ b/src/meta-srv/src/selector/weighted_choose.rs @@ -42,10 +42,10 @@ pub trait WeightedChoose: Send + Sync { } /// The struct represents a weighted item. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq)] pub struct WeightedItem { pub item: Item, - pub weight: usize, + pub weight: f64, } /// A implementation of weighted balance: random weighted choose. @@ -87,7 +87,7 @@ where // unwrap safety: whether weighted_index is none has been checked before. let item = self .items - .choose_weighted(&mut rng(), |item| item.weight as f64) + .choose_weighted(&mut rng(), |item| item.weight) .context(error::ChooseItemsSnafu)? .item .clone(); @@ -95,11 +95,11 @@ where } fn choose_multiple(&mut self, amount: usize) -> Result> { - let amount = amount.min(self.items.iter().filter(|item| item.weight > 0).count()); + let amount = amount.min(self.items.iter().filter(|item| item.weight > 0.0).count()); Ok(self .items - .choose_multiple_weighted(&mut rng(), amount, |item| item.weight as f64) + .choose_multiple_weighted(&mut rng(), amount, |item| item.weight) .context(error::ChooseItemsSnafu)? .cloned() .map(|item| item.item) @@ -120,9 +120,12 @@ mod tests { let mut choose = RandomWeightedChoose::new(vec![ WeightedItem { item: 1, - weight: 100, + weight: 100.0, + }, + WeightedItem { + item: 2, + weight: 0.0, }, - WeightedItem { item: 2, weight: 0 }, ]); for _ in 0..100 {