feat: support building metasrv with selector from plugins (#5942)

* chore: expose selector

* feat: use f64

* chore: expose selector::common

* feat: build metasrv with selector from plugins
This commit is contained in:
Weny Xu
2025-04-21 18:59:24 +08:00
committed by GitHub
parent ee07b9bfa8
commit 7a8e1bc3f9
6 changed files with 36 additions and 22 deletions

View File

@@ -294,10 +294,20 @@ pub async fn metasrv_builder(
let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef,
let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
info!("Using selector from plugins");
selector
} else {
let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef,
};
info!(
"Using selector from options, selector type: {}",
opts.selector.as_ref()
);
selector
};
Ok(MetasrvBuilder::new()

View File

@@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod common;
pub mod common;
pub mod lease_based;
pub mod load_based;
pub mod round_robin;
#[cfg(test)]
pub(crate) mod test_utils;
mod weight_compute;
mod weighted_choose;
pub mod weighted_choose;
use serde::{Deserialize, Serialize};
use strum::AsRefStr;
use crate::error;
use crate::error::Result;
@@ -51,7 +52,7 @@ impl Default for SelectorOptions {
}
/// [`SelectorType`] refers to the load balancer used when creating tables.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default, AsRefStr)]
#[serde(try_from = "String")]
pub enum SelectorType {
/// The current load balancing is based on the number of regions on each datanode node;

View File

@@ -92,35 +92,35 @@ mod tests {
id: 1,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
weight: 1.0,
},
WeightedItem {
item: Peer {
id: 2,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
weight: 1.0,
},
WeightedItem {
item: Peer {
id: 3,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
weight: 1.0,
},
WeightedItem {
item: Peer {
id: 4,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
weight: 1.0,
},
WeightedItem {
item: Peer {
id: 5,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
weight: 1.0,
},
];

View File

@@ -42,7 +42,7 @@ impl Selector for LeaseBasedSelector {
id: k.node_id,
addr: v.node_addr.clone(),
},
weight: 1,
weight: 1.0,
})
.collect();

View File

@@ -84,7 +84,7 @@ impl WeightCompute for RegionNumsBasedWeightCompute {
.zip(region_nums)
.map(|(peer, region_num)| WeightedItem {
item: peer,
weight: (max_weight - region_num + base_weight) as usize,
weight: (max_weight - region_num + base_weight) as f64,
})
.collect()
}
@@ -148,7 +148,7 @@ mod tests {
2,
);
for weight in weight_array.iter() {
assert_eq!(*expected.get(&weight.item).unwrap(), weight.weight,);
assert_eq!(*expected.get(&weight.item).unwrap(), weight.weight as usize);
}
let mut expected = HashMap::new();

View File

@@ -42,10 +42,10 @@ pub trait WeightedChoose<Item>: Send + Sync {
}
/// The struct represents a weighted item.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq)]
pub struct WeightedItem<Item> {
pub item: Item,
pub weight: usize,
pub weight: f64,
}
/// A implementation of weighted balance: random weighted choose.
@@ -87,7 +87,7 @@ where
// unwrap safety: whether weighted_index is none has been checked before.
let item = self
.items
.choose_weighted(&mut rng(), |item| item.weight as f64)
.choose_weighted(&mut rng(), |item| item.weight)
.context(error::ChooseItemsSnafu)?
.item
.clone();
@@ -95,11 +95,11 @@ where
}
fn choose_multiple(&mut self, amount: usize) -> Result<Vec<Item>> {
let amount = amount.min(self.items.iter().filter(|item| item.weight > 0).count());
let amount = amount.min(self.items.iter().filter(|item| item.weight > 0.0).count());
Ok(self
.items
.choose_multiple_weighted(&mut rng(), amount, |item| item.weight as f64)
.choose_multiple_weighted(&mut rng(), amount, |item| item.weight)
.context(error::ChooseItemsSnafu)?
.cloned()
.map(|item| item.item)
@@ -120,9 +120,12 @@ mod tests {
let mut choose = RandomWeightedChoose::new(vec![
WeightedItem {
item: 1,
weight: 100,
weight: 100.0,
},
WeightedItem {
item: 2,
weight: 0.0,
},
WeightedItem { item: 2, weight: 0 },
]);
for _ in 0..100 {