diff --git a/Cargo.lock b/Cargo.lock index 563841d26f..f0512ecd59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9501,6 +9501,7 @@ name = "plugins" version = "1.0.0-beta.2" dependencies = [ "auth", + "catalog", "clap 4.5.40", "cli", "common-base", @@ -9509,6 +9510,7 @@ dependencies = [ "datanode", "flow", "frontend", + "meta-client", "meta-srv", "serde", "snafu 0.8.6", diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index dfa5169d6e..6cefdb0f79 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use std::time::Duration; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; -use catalog::CatalogManagerRef; use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; use clap::Parser; @@ -26,14 +25,12 @@ use client::client_manager::NodeClients; use common_base::Plugins; use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_grpc::channel_manager::ChannelConfig; -use common_meta::FlownodeId; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::key::TableMetadataManager; use common_meta::key::flow::FlowMetadataManager; -use common_meta::kv_backend::KvBackendRef; use common_stat::ResourceStatImpl; use common_telemetry::info; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; @@ -43,6 +40,7 @@ use flow::{ get_flow_auth_options, }; use meta_client::{MetaClientOptions, MetaClientType}; +use plugins::flownode::context::GrpcConfigureContext; use servers::configurator::GrpcBuilderConfiguratorRef; use snafu::{OptionExt, ResultExt, ensure}; use tracing_appender::non_blocking::WorkerGuard; @@ -435,11 +433,3 @@ impl StartCommand { Ok(Instance::new(flownode, guard)) } } - -/// The context for [`GrpcBuilderConfiguratorRef`] in flownode. -pub struct GrpcConfigureContext { - pub kv_backend: KvBackendRef, - pub fe_client: Arc, - pub flownode_id: FlownodeId, - pub catalog_manager: CatalogManagerRef, -} diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 5be3ff897b..d74b3cee5c 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -45,7 +45,10 @@ use frontend::frontend::Frontend; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; -use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType}; +use meta_client::{MetaClientOptions, MetaClientType}; +use plugins::frontend::context::{ + CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext, +}; use servers::addrs; use servers::grpc::GrpcOptions; use servers::tls::{TlsMode, TlsOption}; @@ -423,9 +426,11 @@ impl StartCommand { let builder = if let Some(configurator) = plugins.get::>() { - let ctx = CatalogManagerConfigureContext { + let ctx = DistributedCatalogManagerConfigureContext { meta_client: meta_client.clone(), }; + let ctx = CatalogManagerConfigureContext::Distributed(ctx); + configurator .configure(builder, ctx) .await @@ -482,11 +487,6 @@ impl StartCommand { } } -/// The context for [`CatalogManagerConfigratorRef`] in frontend. -pub struct CatalogManagerConfigureContext { - pub meta_client: MetaClientRef, -} - #[cfg(test)] mod tests { use std::io::Write; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 4f7cec3552..1ef16a830f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -32,7 +32,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; -use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext}; +use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef}; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; @@ -58,6 +58,10 @@ use frontend::instance::StandaloneDatanodeManager; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; +use plugins::frontend::context::{ + CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext, +}; +use plugins::standalone::context::DdlManagerConfigureContext; use servers::tls::{TlsMode, TlsOption}; use snafu::ResultExt; use standalone::StandaloneInformationExtension; @@ -414,9 +418,10 @@ impl StartCommand { let builder = if let Some(configurator) = plugins.get::>() { - let ctx = CatalogManagerConfigureContext { + let ctx = StandaloneCatalogManagerConfigureContext { fe_client: frontend_client.clone(), }; + let ctx = CatalogManagerConfigureContext::Standalone(ctx); configurator .configure(builder, ctx) .await @@ -506,9 +511,13 @@ impl StartCommand { let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true) .context(error::InitDdlManagerSnafu)?; - let ddl_manager = if let Some(configurator) = plugins.get::() { + let ddl_manager = if let Some(configurator) = + plugins.get::>() + { let ctx = DdlManagerConfigureContext { kv_backend: kv_backend.clone(), + fe_client: frontend_client.clone(), + catalog_manager: catalog_manager.clone(), }; configurator .configure(ddl_manager, ctx) @@ -595,11 +604,6 @@ impl StartCommand { } } -/// The context for [`CatalogManagerConfigratorRef`] in standalone. -pub struct CatalogManagerConfigureContext { - pub fe_client: Arc, -} - #[cfg(test)] mod tests { use std::default::Default; diff --git a/src/common/base/src/plugins.rs b/src/common/base/src/plugins.rs index bbab003c69..aa1a9d1287 100644 --- a/src/common/base/src/plugins.rs +++ b/src/common/base/src/plugins.rs @@ -32,7 +32,12 @@ impl Plugins { pub fn insert(&self, value: T) { let last = self.write().insert(value); - assert!(last.is_none(), "each type of plugins must be one and only"); + if last.is_some() { + panic!( + "Plugin of type {} already exists", + std::any::type_name::() + ); + } } pub fn get(&self) -> Option { @@ -140,7 +145,7 @@ mod tests { } #[test] - #[should_panic(expected = "each type of plugins must be one and only")] + #[should_panic(expected = "Plugin of type i32 already exists")] fn test_plugin_uniqueness() { let plugins = Plugins::new(); plugins.insert(1i32); diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 49d547a459..56cee9697b 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -46,7 +46,6 @@ use crate::error::{ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; -use crate::kv_backend::KvBackendRef; use crate::procedure_executor::ExecutorContext; #[cfg(feature = "enterprise")] use crate::rpc::ddl::DdlTask::CreateTrigger; @@ -70,20 +69,16 @@ use crate::rpc::router::RegionRoute; /// A configurator that customizes or enhances a [`DdlManager`]. #[async_trait::async_trait] -pub trait DdlManagerConfigurator: Send + Sync { +pub trait DdlManagerConfigurator: Send + Sync { /// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`]. async fn configure( &self, ddl_manager: DdlManager, - ctx: DdlManagerConfigureContext, + ctx: C, ) -> std::result::Result; } -pub type DdlManagerConfiguratorRef = Arc; - -pub struct DdlManagerConfigureContext { - pub kv_backend: KvBackendRef, -} +pub type DdlManagerConfiguratorRef = Arc>; pub type DdlManagerRef = Arc; diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index be9e3a0fad..cbefb79cfa 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -28,7 +28,7 @@ use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocato use common_meta::ddl::{ DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef, }; -use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext}; +use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef}; use common_meta::distributed_time_constants::{self}; use common_meta::key::TableMetadataManager; use common_meta::key::flow::FlowMetadataManager; @@ -405,10 +405,11 @@ impl MetasrvBuilder { let ddl_manager = if let Some(configurator) = plugins .as_ref() - .and_then(|p| p.get::()) + .and_then(|p| p.get::>()) { let ctx = DdlManagerConfigureContext { kv_backend: kv_backend.clone(), + meta_peer_client: meta_peer_client.clone(), }; configurator .configure(ddl_manager, ctx) @@ -637,3 +638,9 @@ impl Default for MetasrvBuilder { Self::new() } } + +/// The context for [`DdlManagerConfiguratorRef`]. +pub struct DdlManagerConfigureContext { + pub kv_backend: KvBackendRef, + pub meta_peer_client: MetaPeerClientRef, +} diff --git a/src/plugins/Cargo.toml b/src/plugins/Cargo.toml index 14df62c4fa..658e1c95e3 100644 --- a/src/plugins/Cargo.toml +++ b/src/plugins/Cargo.toml @@ -9,6 +9,7 @@ workspace = true [dependencies] auth.workspace = true +catalog.workspace = true clap.workspace = true cli.workspace = true common-base.workspace = true @@ -17,6 +18,7 @@ common-meta.workspace = true datanode.workspace = true flow.workspace = true frontend.workspace = true +meta-client.workspace = true meta-srv.workspace = true serde.workspace = true snafu.workspace = true diff --git a/src/plugins/src/flownode.rs b/src/plugins/src/flownode.rs index 6b56b008da..9fbb018030 100644 --- a/src/plugins/src/flownode.rs +++ b/src/plugins/src/flownode.rs @@ -30,3 +30,20 @@ pub async fn setup_flownode_plugins( pub async fn start_flownode_plugins(_plugins: Plugins) -> Result<()> { Ok(()) } + +pub mod context { + use std::sync::Arc; + + use catalog::CatalogManagerRef; + use common_meta::FlownodeId; + use common_meta::kv_backend::KvBackendRef; + use flow::FrontendClient; + + /// The context for `GrpcBuilderConfiguratorRef` in flownode. + pub struct GrpcConfigureContext { + pub kv_backend: KvBackendRef, + pub fe_client: Arc, + pub flownode_id: FlownodeId, + pub catalog_manager: CatalogManagerRef, + } +} diff --git a/src/plugins/src/frontend.rs b/src/plugins/src/frontend.rs index 85049d8f80..0d1c1af7b9 100644 --- a/src/plugins/src/frontend.rs +++ b/src/plugins/src/frontend.rs @@ -40,3 +40,25 @@ pub async fn setup_frontend_plugins( pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> { Ok(()) } + +pub mod context { + use std::sync::Arc; + + use flow::FrontendClient; + use meta_client::MetaClientRef; + + /// The context for [`catalog::kvbackend::CatalogManagerConfiguratorRef`] in standalone or + /// distributed. + pub enum CatalogManagerConfigureContext { + Distributed(DistributedCatalogManagerConfigureContext), + Standalone(StandaloneCatalogManagerConfigureContext), + } + + pub struct DistributedCatalogManagerConfigureContext { + pub meta_client: MetaClientRef, + } + + pub struct StandaloneCatalogManagerConfigureContext { + pub fe_client: Arc, + } +} diff --git a/src/plugins/src/lib.rs b/src/plugins/src/lib.rs index 9a979a23a1..c973cb3131 100644 --- a/src/plugins/src/lib.rs +++ b/src/plugins/src/lib.rs @@ -13,12 +13,12 @@ // limitations under the License. mod cli; -mod datanode; -mod flownode; -mod frontend; +pub mod datanode; +pub mod flownode; +pub mod frontend; mod meta_srv; mod options; -mod standalone; +pub mod standalone; pub use cli::SubCommand; pub use datanode::{setup_datanode_plugins, start_datanode_plugins}; diff --git a/src/plugins/src/standalone.rs b/src/plugins/src/standalone.rs index 97b1c22aa7..0cb7ee60e5 100644 --- a/src/plugins/src/standalone.rs +++ b/src/plugins/src/standalone.rs @@ -33,3 +33,18 @@ pub async fn setup_standalone_plugins( pub async fn start_standalone_plugins(_plugins: Plugins) -> Result<()> { Ok(()) } + +pub mod context { + use std::sync::Arc; + + use catalog::CatalogManagerRef; + use common_meta::kv_backend::KvBackendRef; + use flow::FrontendClient; + + /// The context for [`common_meta::ddl_manager::DdlManagerConfiguratorRef`] in standalone. + pub struct DdlManagerConfigureContext { + pub kv_backend: KvBackendRef, + pub fe_client: Arc, + pub catalog_manager: CatalogManagerRef, + } +}