mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-03 11:52:54 +00:00
feat: enhance selector with node exclusion support (#5966)
This commit is contained in:
@@ -66,10 +66,12 @@ use crate::election::postgres::PgElection;
|
||||
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
|
||||
use crate::election::CANDIDATE_LEASE_SECS;
|
||||
use crate::metasrv::builder::MetasrvBuilder;
|
||||
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectorRef};
|
||||
use crate::metasrv::{BackendImpl, Metasrv, MetasrvOptions, SelectTarget, SelectorRef};
|
||||
use crate::node_excluder::NodeExcluderRef;
|
||||
use crate::selector::lease_based::LeaseBasedSelector;
|
||||
use crate::selector::load_based::LoadBasedSelector;
|
||||
use crate::selector::round_robin::RoundRobinSelector;
|
||||
use crate::selector::weight_compute::RegionNumsBasedWeightCompute;
|
||||
use crate::selector::SelectorType;
|
||||
use crate::service::admin;
|
||||
use crate::{error, Result};
|
||||
@@ -294,14 +296,25 @@ pub async fn metasrv_builder(
|
||||
|
||||
let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
|
||||
|
||||
let node_excluder = plugins
|
||||
.get::<NodeExcluderRef>()
|
||||
.unwrap_or_else(|| Arc::new(Vec::new()) as NodeExcluderRef);
|
||||
let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
|
||||
info!("Using selector from plugins");
|
||||
selector
|
||||
} else {
|
||||
let selector = match opts.selector {
|
||||
SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef,
|
||||
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
|
||||
SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef,
|
||||
SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
|
||||
RegionNumsBasedWeightCompute,
|
||||
node_excluder,
|
||||
)) as SelectorRef,
|
||||
SelectorType::LeaseBased => {
|
||||
Arc::new(LeaseBasedSelector::new(node_excluder)) as SelectorRef
|
||||
}
|
||||
SelectorType::RoundRobin => Arc::new(RoundRobinSelector::new(
|
||||
SelectTarget::Datanode,
|
||||
node_excluder,
|
||||
)) as SelectorRef,
|
||||
};
|
||||
info!(
|
||||
"Using selector from options, selector type: {}",
|
||||
|
||||
@@ -190,7 +190,7 @@ impl MetasrvBuilder {
|
||||
|
||||
let meta_peer_client = meta_peer_client
|
||||
.unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
|
||||
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
|
||||
let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
|
||||
let pushers = Pushers::default();
|
||||
let mailbox = build_mailbox(&kv_backend, &pushers);
|
||||
let procedure_manager = build_procedure_manager(&options, &kv_backend);
|
||||
@@ -234,13 +234,17 @@ impl MetasrvBuilder {
|
||||
))
|
||||
});
|
||||
|
||||
let flow_selector = Arc::new(RoundRobinSelector::new(
|
||||
SelectTarget::Flownode,
|
||||
Arc::new(Vec::new()),
|
||||
)) as SelectorRef;
|
||||
|
||||
let flow_metadata_allocator = {
|
||||
// for now flownode just use round-robin selector
|
||||
let flow_selector = RoundRobinSelector::new(SelectTarget::Flownode);
|
||||
let flow_selector_ctx = selector_ctx.clone();
|
||||
let peer_allocator = Arc::new(FlowPeerAllocator::new(
|
||||
flow_selector_ctx,
|
||||
Arc::new(flow_selector),
|
||||
flow_selector.clone(),
|
||||
));
|
||||
let seq = Arc::new(
|
||||
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
|
||||
@@ -420,7 +424,7 @@ impl MetasrvBuilder {
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
selector,
|
||||
// TODO(jeremy): We do not allow configuring the flow selector.
|
||||
flow_selector: Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)),
|
||||
flow_selector,
|
||||
handler_group: RwLock::new(None),
|
||||
handler_group_builder: Mutex::new(Some(handler_group_builder)),
|
||||
election,
|
||||
|
||||
@@ -24,3 +24,9 @@ pub trait NodeExcluder: Send + Sync {
|
||||
/// Returns the excluded datanode ids.
|
||||
fn excluded_datanode_ids(&self) -> &Vec<DatanodeId>;
|
||||
}
|
||||
|
||||
impl NodeExcluder for Vec<DatanodeId> {
|
||||
fn excluded_datanode_ids(&self) -> &Vec<DatanodeId> {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ pub mod load_based;
|
||||
pub mod round_robin;
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test_utils;
|
||||
mod weight_compute;
|
||||
pub mod weight_compute;
|
||||
pub mod weighted_choose;
|
||||
use std::collections::HashSet;
|
||||
|
||||
|
||||
@@ -12,17 +12,37 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::peer::Peer;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::lease;
|
||||
use crate::metasrv::SelectorContext;
|
||||
use crate::node_excluder::NodeExcluderRef;
|
||||
use crate::selector::common::{choose_items, filter_out_excluded_peers};
|
||||
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
|
||||
use crate::selector::{Selector, SelectorOptions};
|
||||
|
||||
/// Select all alive datanodes based using a random weighted choose.
|
||||
pub struct LeaseBasedSelector;
|
||||
pub struct LeaseBasedSelector {
|
||||
node_excluder: NodeExcluderRef,
|
||||
}
|
||||
|
||||
impl LeaseBasedSelector {
|
||||
pub fn new(node_excluder: NodeExcluderRef) -> Self {
|
||||
Self { node_excluder }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LeaseBasedSelector {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
node_excluder: Arc::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl Selector for LeaseBasedSelector {
|
||||
@@ -47,7 +67,14 @@ impl Selector for LeaseBasedSelector {
|
||||
.collect();
|
||||
|
||||
// 3. choose peers by weight_array.
|
||||
filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
|
||||
let mut exclude_peer_ids = self
|
||||
.node_excluder
|
||||
.excluded_datanode_ids()
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
|
||||
filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
|
||||
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
|
||||
let selected = choose_items(&opts, &mut weighted_choose)?;
|
||||
|
||||
|
||||
@@ -12,7 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::datanode::{DatanodeStatKey, DatanodeStatValue};
|
||||
use common_meta::key::TableMetadataManager;
|
||||
@@ -26,6 +27,7 @@ use crate::error::{self, Result};
|
||||
use crate::key::{DatanodeLeaseKey, LeaseValue};
|
||||
use crate::lease;
|
||||
use crate::metasrv::SelectorContext;
|
||||
use crate::node_excluder::NodeExcluderRef;
|
||||
use crate::selector::common::{choose_items, filter_out_excluded_peers};
|
||||
use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
|
||||
use crate::selector::weighted_choose::RandomWeightedChoose;
|
||||
@@ -33,11 +35,15 @@ use crate::selector::{Selector, SelectorOptions};
|
||||
|
||||
pub struct LoadBasedSelector<C> {
|
||||
weight_compute: C,
|
||||
node_excluder: NodeExcluderRef,
|
||||
}
|
||||
|
||||
impl<C> LoadBasedSelector<C> {
|
||||
pub fn new(weight_compute: C) -> Self {
|
||||
Self { weight_compute }
|
||||
pub fn new(weight_compute: C, node_excluder: NodeExcluderRef) -> Self {
|
||||
Self {
|
||||
weight_compute,
|
||||
node_excluder,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +51,7 @@ impl Default for LoadBasedSelector<RegionNumsBasedWeightCompute> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
weight_compute: RegionNumsBasedWeightCompute,
|
||||
node_excluder: Arc::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -88,7 +95,14 @@ where
|
||||
let mut weight_array = self.weight_compute.compute(&stat_kvs);
|
||||
|
||||
// 5. choose peers by weight_array.
|
||||
filter_out_excluded_peers(&mut weight_array, &opts.exclude_peer_ids);
|
||||
let mut exclude_peer_ids = self
|
||||
.node_excluder
|
||||
.excluded_datanode_ids()
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
|
||||
filter_out_excluded_peers(&mut weight_array, &exclude_peer_ids);
|
||||
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
|
||||
let selected = choose_items(&opts, &mut weighted_choose)?;
|
||||
|
||||
|
||||
@@ -12,7 +12,9 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_meta::peer::Peer;
|
||||
use snafu::ensure;
|
||||
@@ -20,6 +22,7 @@ use snafu::ensure;
|
||||
use crate::error::{NoEnoughAvailableNodeSnafu, Result};
|
||||
use crate::lease;
|
||||
use crate::metasrv::{SelectTarget, SelectorContext};
|
||||
use crate::node_excluder::NodeExcluderRef;
|
||||
use crate::selector::{Selector, SelectorOptions};
|
||||
|
||||
/// Round-robin selector that returns the next peer in the list in sequence.
|
||||
@@ -32,6 +35,7 @@ use crate::selector::{Selector, SelectorOptions};
|
||||
pub struct RoundRobinSelector {
|
||||
select_target: SelectTarget,
|
||||
counter: AtomicUsize,
|
||||
node_excluder: NodeExcluderRef,
|
||||
}
|
||||
|
||||
impl Default for RoundRobinSelector {
|
||||
@@ -39,32 +43,38 @@ impl Default for RoundRobinSelector {
|
||||
Self {
|
||||
select_target: SelectTarget::Datanode,
|
||||
counter: AtomicUsize::new(0),
|
||||
node_excluder: Arc::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RoundRobinSelector {
|
||||
pub fn new(select_target: SelectTarget) -> Self {
|
||||
pub fn new(select_target: SelectTarget, node_excluder: NodeExcluderRef) -> Self {
|
||||
Self {
|
||||
select_target,
|
||||
node_excluder,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_peers(
|
||||
&self,
|
||||
min_required_items: usize,
|
||||
ctx: &SelectorContext,
|
||||
) -> Result<Vec<Peer>> {
|
||||
async fn get_peers(&self, opts: &SelectorOptions, ctx: &SelectorContext) -> Result<Vec<Peer>> {
|
||||
let mut peers = match self.select_target {
|
||||
SelectTarget::Datanode => {
|
||||
// 1. get alive datanodes.
|
||||
let lease_kvs =
|
||||
lease::alive_datanodes(&ctx.meta_peer_client, ctx.datanode_lease_secs).await?;
|
||||
|
||||
let mut exclude_peer_ids = self
|
||||
.node_excluder
|
||||
.excluded_datanode_ids()
|
||||
.iter()
|
||||
.cloned()
|
||||
.collect::<HashSet<_>>();
|
||||
exclude_peer_ids.extend(opts.exclude_peer_ids.iter());
|
||||
// 2. map into peers
|
||||
lease_kvs
|
||||
.into_iter()
|
||||
.filter(|(k, _)| !exclude_peer_ids.contains(&k.node_id))
|
||||
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
@@ -84,8 +94,8 @@ impl RoundRobinSelector {
|
||||
ensure!(
|
||||
!peers.is_empty(),
|
||||
NoEnoughAvailableNodeSnafu {
|
||||
required: min_required_items,
|
||||
available: 0usize,
|
||||
required: opts.min_required_items,
|
||||
available: peers.len(),
|
||||
select_target: self.select_target
|
||||
}
|
||||
);
|
||||
@@ -103,7 +113,7 @@ impl Selector for RoundRobinSelector {
|
||||
type Output = Vec<Peer>;
|
||||
|
||||
async fn select(&self, ctx: &Self::Context, opts: SelectorOptions) -> Result<Vec<Peer>> {
|
||||
let peers = self.get_peers(opts.min_required_items, ctx).await?;
|
||||
let peers = self.get_peers(&opts, ctx).await?;
|
||||
// choose peers
|
||||
let mut selected = Vec::with_capacity(opts.min_required_items);
|
||||
for _ in 0..opts.min_required_items {
|
||||
@@ -176,4 +186,42 @@ mod test {
|
||||
assert_eq!(peers.len(), 2);
|
||||
assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_round_robin_selector_with_exclude_peer_ids() {
|
||||
let selector = RoundRobinSelector::new(SelectTarget::Datanode, Arc::new(vec![5]));
|
||||
let ctx = create_selector_context();
|
||||
// add three nodes
|
||||
let peer1 = Peer {
|
||||
id: 2,
|
||||
addr: "node1".to_string(),
|
||||
};
|
||||
let peer2 = Peer {
|
||||
id: 5,
|
||||
addr: "node2".to_string(),
|
||||
};
|
||||
let peer3 = Peer {
|
||||
id: 8,
|
||||
addr: "node3".to_string(),
|
||||
};
|
||||
put_datanodes(
|
||||
&ctx.meta_peer_client,
|
||||
vec![peer1.clone(), peer2.clone(), peer3.clone()],
|
||||
)
|
||||
.await;
|
||||
|
||||
let peers = selector
|
||||
.select(
|
||||
&ctx,
|
||||
SelectorOptions {
|
||||
min_required_items: 1,
|
||||
allow_duplication: true,
|
||||
exclude_peer_ids: HashSet::from([2]),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(peers.len(), 1);
|
||||
assert_eq!(peers, vec![peer3.clone()]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user