feat: introduce plugin setup functions with richer context (#8256)

feat: enrich plugin setup context
This commit is contained in:
Ning Sun
2026-06-08 14:53:08 +08:00
committed by GitHub
parent e7ce3ac0c7
commit fd64ced4da
17 changed files with 282 additions and 67 deletions

View File

@@ -46,7 +46,12 @@ impl InstanceBuilder {
) -> Result<Self> {
let guard = Self::init(&mut opts, &mut plugins).await?;
let datanode_builder = Self::datanode_builder(&opts, plugins).await?;
let mut datanode_builder = Self::datanode_builder(&opts, &plugins).await?;
plugins::setup_datanode_plugins_post_build(&mut plugins, &opts.plugins, &datanode_builder)
.await
.context(StartDatanodeSnafu)?;
datanode_builder.set_plugins(plugins);
Ok(Self {
guard,
@@ -71,7 +76,7 @@ impl InstanceBuilder {
maybe_activate_heap_profile(&dn_opts.memory);
create_resource_limit_metrics(APP_NAME);
plugins::setup_datanode_plugins(plugins, &opts.plugins, dn_opts)
plugins::setup_datanode_plugins_pre_build(plugins, &opts.plugins, dn_opts)
.await
.context(StartDatanodeSnafu)?;
@@ -81,7 +86,10 @@ impl InstanceBuilder {
Ok(guard)
}
async fn datanode_builder(opts: &DatanodeOptions, plugins: Plugins) -> Result<DatanodeBuilder> {
async fn datanode_builder(
opts: &DatanodeOptions,
plugins: &Plugins,
) -> Result<DatanodeBuilder> {
let dn_opts = &opts.component;
let member_id = dn_opts
@@ -93,7 +101,7 @@ impl InstanceBuilder {
let client = meta_client::create_meta_client(
MetaClientType::Datanode { member_id },
meta_client_options,
Some(&plugins),
Some(plugins),
None,
)
.await

View File

@@ -278,7 +278,7 @@ impl StartCommand {
opts.grpc.detect_server_addr();
let mut plugins = Plugins::new();
plugins::setup_flownode_plugins(&mut plugins, &plugin_opts, &opts)
plugins::setup_flownode_plugins_pre_build(&mut plugins, &plugin_opts, &opts)
.await
.context(StartFlownodeSnafu)?;
@@ -376,7 +376,7 @@ impl StartCommand {
)
.context(StartFlownodeSnafu)?;
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
let mut flownode_builder = FlownodeBuilder::new(
opts.clone(),
plugins.clone(),
table_metadata_manager,
@@ -386,6 +386,11 @@ impl StartCommand {
)
.with_heartbeat_task(heartbeat_task);
plugins::setup_flownode_plugins_post_build(&mut plugins, &plugin_opts, &flownode_builder)
.await
.context(StartFlownodeSnafu)?;
flownode_builder.set_plugins(plugins.clone());
let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
let builder =

View File

@@ -52,7 +52,6 @@ use plugins::PluginOptions;
use plugins::frontend::context::{
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
};
use plugins::frontend::setup_frontend_dynamic_plugins;
use plugins::options::PluginOptionsDeserializerImpl;
use servers::addrs;
use servers::grpc::GrpcOptions;
@@ -354,10 +353,6 @@ impl StartCommand {
let plugin_opts = opts.plugins;
let mut opts = opts.component;
opts.grpc.detect_server_addr();
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
.await
.context(error::StartFrontendSnafu)?;
set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
set_default_prefix(opts.default_column_prefix.as_deref())
@@ -375,6 +370,29 @@ impl StartCommand {
let cache_ttl = meta_client_options.metadata_cache_ttl;
let cache_tti = meta_client_options.metadata_cache_tti;
let meta_config: Vec<PluginOptions> = meta_client::create_meta_client(
MetaClientType::Frontend,
meta_client_options,
None,
None,
)
.await
.context(error::MetaClientInitSnafu)?
.pull_config(PluginOptionsDeserializerImpl)
.await
.context(error::MetaClientInitSnafu)?;
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins_pre_build(
&mut plugins,
&plugin_opts,
&opts,
Some(&meta_config),
)
.await
.context(error::StartFrontendSnafu)?;
// now initialize the meta_client with plugins
let meta_client = meta_client::create_meta_client(
MetaClientType::Frontend,
meta_client_options,
@@ -384,14 +402,6 @@ impl StartCommand {
.await
.context(error::MetaClientInitSnafu)?;
let meta_config: Vec<PluginOptions> = meta_client
.pull_config(PluginOptionsDeserializerImpl)
.await
.context(error::MetaClientInitSnafu)?;
setup_frontend_dynamic_plugins(meta_config, &mut plugins)
.await
.context(error::StartFrontendSnafu)?;
let readonly_meta_backend = new_read_only_meta_kv_backend(meta_client.clone());
// TODO(discord9): add helper function to ease the creation of cache registry&such
@@ -470,7 +480,7 @@ impl StartCommand {
};
let catalog_manager = builder.build();
let instance = FrontendBuilder::new(
let builder = FrontendBuilder::new(
opts.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
@@ -478,12 +488,18 @@ impl StartCommand {
client,
meta_client.clone(),
process_manager,
)
.with_plugin(plugins.clone())
.with_local_cache_invalidator(layered_cache_registry)
.try_build()
.await
.context(error::StartFrontendSnafu)?;
);
plugins::setup_frontend_plugins_post_build(&mut plugins, &plugin_opts, &builder)
.await
.context(error::StartFrontendSnafu)?;
let instance = builder
.with_plugin(plugins.clone())
.with_local_cache_invalidator(layered_cache_registry)
.try_build()
.await
.context(error::StartFrontendSnafu)?;
let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
@@ -638,7 +654,7 @@ mod tests {
};
let mut plugins = Plugins::new();
plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
plugins::setup_frontend_plugins_pre_build(&mut plugins, &[], &fe_opts, None)
.await
.unwrap();

View File

@@ -342,14 +342,23 @@ impl StartCommand {
info!("Metasrv options: {:#?}", opts);
let mut plugins = Plugins::new();
plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
plugins::setup_metasrv_plugins_pre_build(&mut plugins, &plugin_opts, &opts)
.await
.context(StartMetaServerSnafu)?;
let builder = metasrv_builder(&opts, plugins, None)
let builder = metasrv_builder(&opts, &plugins, None)
.await
.context(error::BuildMetaServerSnafu)?;
plugins::setup_metasrv_plugins_post_build(&mut plugins, &plugin_opts, &builder)
.await
.context(StartMetaServerSnafu)?;
let metasrv = builder
.plugins(plugins)
.build()
.await
.context(error::BuildMetaServerSnafu)?;
let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
let instance = MetasrvInstance::new(metasrv)
.await

View File

@@ -380,11 +380,11 @@ impl StartCommand {
let node_id = dn_opts.node_id;
let init_regions_parallelism = dn_opts.init_regions_parallelism;
plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
plugins::setup_frontend_plugins_pre_build(&mut plugins, &plugin_opts, &fe_opts, None)
.await
.context(error::StartFrontendSnafu)?;
plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
plugins::setup_datanode_plugins_pre_build(&mut plugins, &plugin_opts, &dn_opts)
.await
.context(error::StartDatanodeSnafu)?;
@@ -429,6 +429,12 @@ impl StartCommand {
if let Some(writable) = creator.open_regions_writable_override {
builder.with_open_regions_writable_override(writable);
}
plugins::setup_datanode_plugins_post_build(&mut plugins, &plugin_opts, &builder)
.await
.context(error::StartDatanodeSnafu)?;
builder.set_plugins(plugins.clone());
let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
let information_extension = Arc::new(StandaloneInformationExtension::new(
@@ -478,7 +484,7 @@ impl StartCommand {
..Default::default()
};
let flow_builder = FlownodeBuilder::new(
let mut flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
table_metadata_manager.clone(),
@@ -486,6 +492,12 @@ impl StartCommand {
flow_metadata_manager.clone(),
frontend_client.clone(),
);
plugins::setup_flownode_plugins_post_build(&mut plugins, &plugin_opts, &flow_builder)
.await
.context(error::StartFlownodeSnafu)?;
flow_builder.set_plugins(plugins.clone());
let flownode = flow_builder
.build()
.await
@@ -578,11 +590,17 @@ impl StartCommand {
node_manager.clone(),
procedure_executor.clone(),
process_manager,
)
.with_plugin(plugins.clone())
.try_build()
.await
.context(error::StartFrontendSnafu)?;
);
plugins::setup_frontend_plugins_post_build(&mut plugins, &plugin_opts, &fe_instance)
.await
.context(error::StartFrontendSnafu)?;
let fe_instance = fe_instance
.with_plugin(plugins.clone())
.try_build()
.await
.context(error::StartFrontendSnafu)?;
let fe_instance = Arc::new(fe_instance);
// set the frontend client for flownode
@@ -959,7 +977,7 @@ mod tests {
let mut plugins = Plugins::new();
plugins.insert(StandaloneFlag);
plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
plugins::setup_frontend_plugins_pre_build(&mut plugins, &[], &fe_opts, None)
.await
.unwrap();

View File

@@ -30,7 +30,7 @@ pub trait PluginOptionsDeserializer<T: DeserializeOwned>: Send + Sync {
/// A flag for stating the standalone mode in the plugins.
///
/// The standalone build and start process calls `setup_frontend_plugins` and `setup_datanode_plugins`,
/// The standalone build and start process calls `setup_frontend_plugins_pre_build` and `setup_datanode_plugins_pre_build`,
/// so we add a flag to the plugins to indicate that the plugins are running in the standalone mode.
#[derive(Clone, Copy, Debug)]
pub struct StandaloneFlag;

View File

@@ -202,6 +202,18 @@ impl DatanodeBuilder {
&self.kv_backend
}
pub fn meta_client(&self) -> Option<&MetaClientRef> {
self.meta_client.as_ref()
}
pub fn cache_registry(&self) -> Option<&Arc<LayeredCacheRegistry>> {
self.cache_registry.as_ref()
}
pub fn set_plugins(&mut self, plugins: Plugins) {
self.plugins = plugins;
}
pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
self.table_provider_factory = Some(factory);
self

View File

@@ -348,6 +348,30 @@ impl FlownodeBuilder {
}
}
pub fn opts(&self) -> &FlownodeOptions {
&self.opts
}
pub fn table_meta(&self) -> &TableMetadataManagerRef {
&self.table_meta
}
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
pub fn flow_metadata_manager(&self) -> &FlowMetadataManagerRef {
&self.flow_metadata_manager
}
pub fn frontend_client(&self) -> &Arc<FrontendClient> {
&self.frontend_client
}
pub fn set_plugins(&mut self, plugins: Plugins) {
self.plugins = plugins;
}
pub async fn build(mut self) -> Result<FlownodeInstance, Error> {
// TODO(discord9): does this query engine need those?
let query_engine_factory = QueryEngineFactory::new_with_plugins(

View File

@@ -132,6 +132,34 @@ impl FrontendBuilder {
}
}
pub fn options(&self) -> &FrontendOptions {
&self.options
}
pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
}
pub fn layered_cache_registry(&self) -> &LayeredCacheRegistryRef {
&self.layered_cache_registry
}
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
pub fn node_manager(&self) -> &NodeManagerRef {
&self.node_manager
}
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
&self.procedure_executor
}
pub fn process_manager(&self) -> &ProcessManagerRef {
&self.process_manager
}
pub fn with_plugin(self, plugins: Plugins) -> Self {
Self {
plugins: Some(plugins),

View File

@@ -261,7 +261,7 @@ pub fn router(metasrv: Arc<Metasrv>) -> Router {
pub async fn metasrv_builder(
opts: &MetasrvOptions,
plugins: Plugins,
plugins: &Plugins,
kv_backend: Option<KvBackendRef>,
) -> Result<MetasrvBuilder> {
let (mut kv_backend, election) = match (kv_backend, &opts.backend) {
@@ -421,8 +421,7 @@ pub async fn metasrv_builder(
.in_memory(in_memory)
.selector(selector)
.election(election)
.meta_peer_client(meta_peer_client)
.plugins(plugins))
.meta_peer_client(meta_peer_client))
}
pub(crate) fn build_default_meta_peer_client(
@@ -472,7 +471,7 @@ mod tests {
metasrv_builder(
&opts,
plugins,
&plugins,
Some(Arc::new(MemoryKvBackend::new()) as KvBackendRef),
)
.await

View File

@@ -176,6 +176,30 @@ impl MetasrvBuilder {
self
}
pub fn options_ref(&self) -> Option<&MetasrvOptions> {
self.options.as_ref()
}
pub fn kv_backend_ref(&self) -> Option<&KvBackendRef> {
self.kv_backend.as_ref()
}
pub fn in_memory_ref(&self) -> Option<&ResettableKvBackendRef> {
self.in_memory.as_ref()
}
pub fn election_ref(&self) -> Option<&ElectionRef> {
self.election.as_ref()
}
pub fn meta_peer_client_ref(&self) -> Option<&MetaPeerClientRef> {
self.meta_peer_client.as_ref()
}
pub fn node_manager_ref(&self) -> Option<&NodeManagerRef> {
self.node_manager.as_ref()
}
pub async fn build(self) -> Result<Metasrv> {
let MetasrvBuilder {
election,

View File

@@ -14,14 +14,15 @@
use common_base::Plugins;
use datanode::config::DatanodeOptions;
use datanode::datanode::Datanode;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::error::Result;
use crate::options::PluginOptions;
/// Sets up datanode plugins before the [`DatanodeBuilder`] is constructed.
#[allow(unused_variables)]
#[allow(unused_mut)]
pub async fn setup_datanode_plugins(
pub async fn setup_datanode_plugins_pre_build(
plugins: &mut Plugins,
plugin_options: &[PluginOptions],
dn_opts: &DatanodeOptions,
@@ -29,6 +30,21 @@ pub async fn setup_datanode_plugins(
Ok(())
}
/// Sets up datanode plugins after the [`DatanodeBuilder`] is constructed
/// but before [`DatanodeBuilder::build()`].
///
/// Plugins can read context from the builder (e.g., kv_backend, options)
/// and insert additional plugins. After this call, [`DatanodeBuilder::set_plugins()`]
/// should be called to sync plugins into the builder.
#[allow(unused_variables)]
pub async fn setup_datanode_plugins_post_build(
plugins: &mut Plugins,
plugin_options: &[PluginOptions],
builder: &DatanodeBuilder,
) -> Result<()> {
Ok(())
}
pub async fn start_datanode_plugins(_instance: &Datanode) -> Result<()> {
Ok(())
}

View File

@@ -14,19 +14,35 @@
use common_base::Plugins;
use flow::error::Result;
use flow::{FlownodeInstance, FlownodeOptions};
use flow::{FlownodeBuilder, FlownodeInstance, FlownodeOptions};
use crate::options::PluginOptions;
#[allow(unused_mut)]
pub async fn setup_flownode_plugins(
_plugins: &mut Plugins,
_plugin_options: &[PluginOptions],
/// Sets up flownode plugins before the [`FlownodeBuilder`] is constructed.
#[allow(unused_mut, unused_variables)]
pub async fn setup_flownode_plugins_pre_build(
plugins: &mut Plugins,
plugin_options: &[PluginOptions],
_fn_opts: &FlownodeOptions,
) -> Result<()> {
Ok(())
}
/// Sets up flownode plugins after the [`FlownodeBuilder`] is constructed
/// but before [`FlownodeBuilder::build()`].
///
/// Plugins can read context from the builder (e.g., opts, catalog_manager, flow_metadata_manager)
/// and insert additional plugins. After this call, [`FlownodeBuilder::set_plugins()`]
/// should be called to sync plugins into the builder.
#[allow(unused_variables)]
pub async fn setup_flownode_plugins_post_build(
plugins: &mut Plugins,
plugin_options: &[PluginOptions],
builder: &FlownodeBuilder,
) -> Result<()> {
Ok(())
}
pub async fn start_flownode_plugins(_instance: &FlownodeInstance) -> Result<()> {
Ok(())
}

View File

@@ -18,15 +18,27 @@ use common_meta::cache::CacheRegistryBuilder;
use frontend::error::{IllegalAuthConfigSnafu, Result};
use frontend::frontend::FrontendOptions;
use frontend::instance::Instance;
use frontend::instance::builder::FrontendBuilder;
use snafu::ResultExt;
use crate::options::PluginOptions;
/// Sets up frontend plugins before the [`FrontendBuilder`] is constructed.
///
/// This is where "infrastructure configurators" are registered — plugins that the builder
/// consumes during construction (e.g., `CatalogManagerConfiguratorRef`, cache invalidators).
///
/// In distributed mode this is called twice:
/// 1. First without meta config (before `create_meta_client`), for plugins needed by the meta client.
/// 2. Second with meta config pulled from metasrv, for dynamic configurators.
///
/// In standalone mode it is called once with `None`.
#[allow(unused_mut)]
pub async fn setup_frontend_plugins(
pub async fn setup_frontend_plugins_pre_build(
plugins: &mut Plugins,
_plugin_options: &[PluginOptions],
fe_opts: &FrontendOptions,
_meta_config: Option<&[PluginOptions]>,
) -> Result<()> {
if let Some(user_provider) = fe_opts.user_provider.as_ref() {
let provider =
@@ -39,15 +51,15 @@ pub async fn setup_frontend_plugins(
Ok(())
}
/// Setup dynamic plugins based on the meta config in frontend.
/// This is called after the `setup_frontend_plugins` because the meta client needs to be created first.
/// Sets up frontend plugins after the [`FrontendBuilder`] is constructed
/// but before [`FrontendBuilder::try_build()`] and [`FrontendBuilder::with_plugin()`].
///
/// For those configs/plugins which are corresponding with the metasrv's config,
/// we pull from metasrv first, then create/override the current config/plugin.
/// Note: make sure the override works as expected.
pub async fn setup_frontend_dynamic_plugins(
_meta_config: Vec<PluginOptions>,
/// This is where "feature plugins" are registered — plugins that consume builder context
/// (e.g., `KvBackendRef`, `CatalogManagerRef`) to construct themselves.
pub async fn setup_frontend_plugins_post_build(
_plugins: &mut Plugins,
_plugin_options: &[PluginOptions],
_builder: &FrontendBuilder,
) -> Result<()> {
Ok(())
}

View File

@@ -21,9 +21,17 @@ pub mod options;
pub mod standalone;
pub use cli::SubCommand;
pub use datanode::{setup_datanode_plugins, start_datanode_plugins};
pub use flownode::{setup_flownode_plugins, start_flownode_plugins};
pub use frontend::{setup_frontend_plugins, start_frontend_plugins};
pub use meta_srv::{setup_metasrv_plugins, start_metasrv_plugins};
pub use datanode::{
setup_datanode_plugins_post_build, setup_datanode_plugins_pre_build, start_datanode_plugins,
};
pub use flownode::{
setup_flownode_plugins_post_build, setup_flownode_plugins_pre_build, start_flownode_plugins,
};
pub use frontend::{
setup_frontend_plugins_post_build, setup_frontend_plugins_pre_build, start_frontend_plugins,
};
pub use meta_srv::{
setup_metasrv_plugins_post_build, setup_metasrv_plugins_pre_build, start_metasrv_plugins,
};
pub use options::PluginOptions;
pub use standalone::setup_standalone_plugins;

View File

@@ -16,14 +16,34 @@ use common_base::Plugins;
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::error::Result;
use meta_srv::metasrv::MetasrvOptions;
use meta_srv::metasrv::builder::MetasrvBuilder;
use crate::options::PluginOptions;
/// Sets up metasrv plugins before the [`MetasrvBuilder`] is constructed.
///
/// Plugins registered here are available during builder construction
/// (e.g., `SelectorFactoryRef`).
#[allow(unused_variables)]
pub async fn setup_metasrv_plugins(
_plugins: &mut Plugins,
pub async fn setup_metasrv_plugins_pre_build(
plugins: &mut Plugins,
plugin_options: &[PluginOptions],
metasrv_opts: &MetasrvOptions,
_metasrv_opts: &MetasrvOptions,
) -> Result<()> {
Ok(())
}
/// Sets up metasrv plugins after the [`MetasrvBuilder`] is constructed
/// but before [`MetasrvBuilder::build()`].
///
/// Plugins can read context from the builder (e.g., kv_backend, options)
/// and insert additional plugins. After this call, [`MetasrvBuilder::plugins()`]
/// should be called to set plugins on the builder.
#[allow(unused_variables)]
pub async fn setup_metasrv_plugins_post_build(
plugins: &mut Plugins,
plugin_options: &[PluginOptions],
builder: &MetasrvBuilder,
) -> Result<()> {
Ok(())
}

View File

@@ -289,7 +289,7 @@ pub async fn test_http_auth_from_standalone_user_provider_config() {
let mut plugins = Plugins::new();
plugins.insert(StandaloneFlag);
plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
plugins::setup_frontend_plugins_pre_build(&mut plugins, &[], &fe_opts, None)
.await
.unwrap();
let user_provider = plugins.get::<UserProviderRef>();