mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 15:30:40 +00:00
feat(meta-srv): add selector factory plugin hook (#8140)
* meta-srv: introduce SelectorWrapper to wrap configured selector - `SelectorWrapper trait`: add `SelectorWrapper` trait and `SelectorWrapperRef` in `src/meta-srv/src/metasrv.rs` to support decorating selectors - `metasrv bootstrap`: apply `SelectorWrapperRef` in `src/meta-srv/src/bootstrap.rs` to wrap the configured selector, and add unit tests to verify the behavior Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat(meta-srv): add selector factory plugin hook - `SelectorFactory`: replace selector wrapper registration with a bootstrap-time factory context in `src/meta-srv/src/metasrv.rs` - `metasrv_builder`: build the configured base selector before invoking plugin factories in `src/meta-srv/src/bootstrap.rs` Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -46,7 +46,8 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
|
||||
use crate::error::OtherSnafu;
|
||||
use crate::metasrv::builder::MetasrvBuilder;
|
||||
use crate::metasrv::{
|
||||
BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
|
||||
BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorFactoryContext,
|
||||
SelectorFactoryRef, SelectorRef,
|
||||
};
|
||||
use crate::selector::lease_based::LeaseBasedSelector;
|
||||
use crate::selector::load_based::LoadBasedSelector;
|
||||
@@ -381,30 +382,37 @@ pub async fn metasrv_builder(
|
||||
let in_memory = Arc::new(MemoryKvBackend::new()) as ResettableKvBackendRef;
|
||||
let meta_peer_client = build_default_meta_peer_client(&election, &in_memory);
|
||||
|
||||
let selector = if let Some(selector) = plugins.get::<SelectorRef>() {
|
||||
info!("Using selector from plugins");
|
||||
selector
|
||||
let base_selector: Arc<
|
||||
dyn Selector<
|
||||
Context = crate::metasrv::SelectorContext,
|
||||
Output = Vec<common_meta::peer::Peer>,
|
||||
>,
|
||||
> = match opts.selector {
|
||||
SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
|
||||
RegionNumsBasedWeightCompute,
|
||||
meta_peer_client.clone(),
|
||||
)) as SelectorRef,
|
||||
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
|
||||
SelectorType::RoundRobin => {
|
||||
Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"Using selector from options, selector type: {}",
|
||||
opts.selector.as_ref()
|
||||
);
|
||||
|
||||
let selector = if let Some(factory) = plugins.get::<SelectorFactoryRef>() {
|
||||
info!("Building selector from plugin factory");
|
||||
factory.build(SelectorFactoryContext {
|
||||
metasrv_options: opts.clone(),
|
||||
meta_peer_client: meta_peer_client.clone(),
|
||||
in_memory: in_memory.clone(),
|
||||
election: election.clone(),
|
||||
base_selector,
|
||||
})
|
||||
} else {
|
||||
let selector: Arc<
|
||||
dyn Selector<
|
||||
Context = crate::metasrv::SelectorContext,
|
||||
Output = Vec<common_meta::peer::Peer>,
|
||||
>,
|
||||
> = match opts.selector {
|
||||
SelectorType::LoadBased => Arc::new(LoadBasedSelector::new(
|
||||
RegionNumsBasedWeightCompute,
|
||||
meta_peer_client.clone(),
|
||||
)) as SelectorRef,
|
||||
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
|
||||
SelectorType::RoundRobin => {
|
||||
Arc::new(RoundRobinSelector::new(SelectTarget::Datanode)) as SelectorRef
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"Using selector from options, selector type: {}",
|
||||
opts.selector.as_ref()
|
||||
);
|
||||
selector
|
||||
base_selector
|
||||
};
|
||||
|
||||
Ok(MetasrvBuilder::new()
|
||||
@@ -429,3 +437,47 @@ pub(crate) fn build_default_meta_peer_client(
|
||||
// Safety: all required fields set at initialization
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
|
||||
use super::*;
|
||||
use crate::metasrv::{SelectorFactory, SelectorFactoryContext};
|
||||
|
||||
struct RecordingSelectorFactory {
|
||||
called: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl SelectorFactory for RecordingSelectorFactory {
|
||||
fn build(&self, ctx: SelectorFactoryContext) -> SelectorRef {
|
||||
self.called.store(true, Ordering::Relaxed);
|
||||
ctx.base_selector
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn metasrv_builder_builds_load_based_selector_from_plugin_factory() {
|
||||
let called = Arc::new(AtomicBool::new(false));
|
||||
let plugins = Plugins::new();
|
||||
plugins.insert(Arc::new(RecordingSelectorFactory {
|
||||
called: called.clone(),
|
||||
}) as SelectorFactoryRef);
|
||||
let opts = MetasrvOptions {
|
||||
selector: SelectorType::LoadBased,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
metasrv_builder(
|
||||
&opts,
|
||||
plugins,
|
||||
Some(Arc::new(MemoryKvBackend::new()) as KvBackendRef),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(called.load(Ordering::Relaxed));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -482,6 +482,28 @@ pub struct SelectorContext {
|
||||
}
|
||||
|
||||
pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
|
||||
|
||||
/// Context passed to a selector factory during metasrv bootstrap.
|
||||
///
|
||||
/// The factory runs after bootstrap has constructed the selector configured by
|
||||
/// [`MetasrvOptions::selector`], so plugins can either decorate `base_selector` or
|
||||
/// build a completely different selector using bootstrap-only dependencies like
|
||||
/// [`MetaPeerClientRef`].
|
||||
pub struct SelectorFactoryContext {
|
||||
pub metasrv_options: MetasrvOptions,
|
||||
pub meta_peer_client: MetaPeerClientRef,
|
||||
pub in_memory: ResettableKvBackendRef,
|
||||
pub election: Option<ElectionRef>,
|
||||
pub base_selector: SelectorRef,
|
||||
}
|
||||
|
||||
/// Builds the final datanode selector metasrv should use.
|
||||
pub trait SelectorFactory: Send + Sync {
|
||||
fn build(&self, ctx: SelectorFactoryContext) -> SelectorRef;
|
||||
}
|
||||
|
||||
/// Shared selector factory plugin registered through [`common_base::Plugins`].
|
||||
pub type SelectorFactoryRef = Arc<dyn SelectorFactory>;
|
||||
pub type RegionStatAwareSelectorRef =
|
||||
Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user