diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 62674e2572..c41548082d 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -5,7 +5,6 @@ edition.workspace = true license.workspace = true [features] -enterprise = [] testing = [] [lints] diff --git a/src/catalog/src/kvbackend.rs b/src/catalog/src/kvbackend.rs index d7f32fc66d..334acc999c 100644 --- a/src/catalog/src/kvbackend.rs +++ b/src/catalog/src/kvbackend.rs @@ -12,13 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend}; - mod builder; mod client; mod manager; mod table_cache; -pub use builder::KvBackendCatalogManagerBuilder; +pub use builder::{ + CatalogManagerConfigurator, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder, +}; +pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend}; pub use manager::KvBackendCatalogManager; pub use table_cache::{TableCache, TableCacheRef, new_table_cache}; diff --git a/src/catalog/src/kvbackend/builder.rs b/src/catalog/src/kvbackend/builder.rs index 247a111124..de56f81c0f 100644 --- a/src/catalog/src/kvbackend/builder.rs +++ b/src/catalog/src/kvbackend/builder.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_error::ext::BoxedError; use common_meta::cache::LayeredCacheRegistryRef; use common_meta::key::TableMetadataManager; use common_meta::key::flow::FlowMetadataManager; @@ -23,24 +25,34 @@ use common_procedure::ProcedureManagerRef; use moka::sync::Cache; use partition::manager::PartitionRuleManager; -#[cfg(feature = "enterprise")] -use crate::information_schema::InformationSchemaTableFactoryRef; -use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider}; +use crate::information_schema::{ + InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef, +}; use crate::kvbackend::KvBackendCatalogManager; use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog}; use crate::process_manager::ProcessManagerRef; use crate::system_schema::numbers_table_provider::NumbersTableProvider; use crate::system_schema::pg_catalog::PGCatalogProvider; +/// The configurator that customizes or enhances the [`KvBackendCatalogManagerBuilder`]. +#[async_trait::async_trait] +pub trait CatalogManagerConfigurator: Send + Sync { + async fn configure( + &self, + builder: KvBackendCatalogManagerBuilder, + ctx: C, + ) -> std::result::Result; +} + +pub type CatalogManagerConfiguratorRef = Arc>; + pub struct KvBackendCatalogManagerBuilder { information_extension: InformationExtensionRef, backend: KvBackendRef, cache_registry: LayeredCacheRegistryRef, procedure_manager: Option, process_manager: Option, - #[cfg(feature = "enterprise")] - extra_information_table_factories: - std::collections::HashMap, + extra_information_table_factories: HashMap, } impl KvBackendCatalogManagerBuilder { @@ -55,8 +67,7 @@ impl KvBackendCatalogManagerBuilder { cache_registry, procedure_manager: None, process_manager: None, - #[cfg(feature = "enterprise")] - extra_information_table_factories: std::collections::HashMap::new(), + extra_information_table_factories: HashMap::new(), } } @@ -71,10 +82,9 @@ impl KvBackendCatalogManagerBuilder { } /// Sets the extra information tables. - #[cfg(feature = "enterprise")] pub fn with_extra_information_table_factories( mut self, - factories: std::collections::HashMap, + factories: HashMap, ) -> Self { self.extra_information_table_factories = factories; self @@ -87,7 +97,6 @@ impl KvBackendCatalogManagerBuilder { cache_registry, procedure_manager, process_manager, - #[cfg(feature = "enterprise")] extra_information_table_factories, } = self; Arc::new_cyclic(|me| KvBackendCatalogManager { @@ -111,7 +120,6 @@ impl KvBackendCatalogManagerBuilder { process_manager.clone(), backend.clone(), ); - #[cfg(feature = "enterprise")] let provider = provider .with_extra_table_factories(extra_information_table_factories.clone()); Arc::new(provider) @@ -123,7 +131,6 @@ impl KvBackendCatalogManagerBuilder { numbers_table_provider: NumbersTableProvider, backend, process_manager, - #[cfg(feature = "enterprise")] extra_information_table_factories, }, cache_registry, diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 29e0cc4ce8..7852142c6a 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -53,9 +53,9 @@ use crate::error::{ CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu, }; -#[cfg(feature = "enterprise")] -use crate::information_schema::InformationSchemaTableFactoryRef; -use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider}; +use crate::information_schema::{ + InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef, +}; use crate::kvbackend::TableCacheRef; use crate::process_manager::ProcessManagerRef; use crate::system_schema::SystemSchemaProvider; @@ -557,7 +557,6 @@ pub(super) struct SystemCatalog { pub(super) numbers_table_provider: NumbersTableProvider, pub(super) backend: KvBackendRef, pub(super) process_manager: Option, - #[cfg(feature = "enterprise")] pub(super) extra_information_table_factories: std::collections::HashMap, } @@ -628,7 +627,6 @@ impl SystemCatalog { self.process_manager.clone(), self.backend.clone(), ); - #[cfg(feature = "enterprise")] let provider = provider .with_extra_table_factories(self.extra_information_table_factories.clone()); Arc::new(provider) diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 02005c74c0..18384b8163 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -117,7 +117,6 @@ macro_rules! setup_memory_table { }; } -#[cfg(feature = "enterprise")] pub struct MakeInformationTableRequest { pub catalog_name: String, pub catalog_manager: Weak, @@ -128,12 +127,10 @@ pub struct MakeInformationTableRequest { /// /// This trait allows for extensibility of the information schema by providing /// a way to dynamically create custom information schema tables. -#[cfg(feature = "enterprise")] pub trait InformationSchemaTableFactory { fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef; } -#[cfg(feature = "enterprise")] pub type InformationSchemaTableFactoryRef = Arc; /// The `information_schema` tables info provider. @@ -143,9 +140,7 @@ pub struct InformationSchemaProvider { process_manager: Option, flow_metadata_manager: Arc, tables: HashMap, - #[allow(dead_code)] kv_backend: KvBackendRef, - #[cfg(feature = "enterprise")] extra_table_factories: HashMap, } @@ -166,7 +161,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider { } fn system_table(&self, name: &str) -> Option { - #[cfg(feature = "enterprise")] if let Some(factory) = self.extra_table_factories.get(name) { let req = MakeInformationTableRequest { catalog_name: self.catalog_name.clone(), @@ -281,7 +275,6 @@ impl InformationSchemaProvider { process_manager, tables: HashMap::new(), kv_backend, - #[cfg(feature = "enterprise")] extra_table_factories: HashMap::new(), }; @@ -290,7 +283,6 @@ impl InformationSchemaProvider { provider } - #[cfg(feature = "enterprise")] pub(crate) fn with_extra_table_factories( mut self, factories: HashMap, @@ -358,7 +350,6 @@ impl InformationSchemaProvider { if let Some(process_list) = self.build_table(PROCESS_LIST) { tables.insert(PROCESS_LIST.to_string(), process_list); } - #[cfg(feature = "enterprise")] for name in self.extra_table_factories.keys() { tables.insert(name.clone(), self.build_table(name).expect(name)); } diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 7a957b509b..d279ddb7f0 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -16,7 +16,7 @@ default = [ "meta-srv/pg_kvbackend", "meta-srv/mysql_kvbackend", ] -enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise", "catalog/enterprise"] +enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"] tokio-console = ["common-telemetry/tokio-console"] [lints] diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 07f3279724..dfa5169d6e 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; use std::path::Path; use std::sync::Arc; use std::time::Duration; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; +use catalog::CatalogManagerRef; use catalog::information_extension::DistributedInformationExtension; use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; use clap::Parser; @@ -24,12 +26,14 @@ use client::client_manager::NodeClients; use common_base::Plugins; use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_grpc::channel_manager::ChannelConfig; +use common_meta::FlownodeId; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::key::TableMetadataManager; use common_meta::key::flow::FlowMetadataManager; +use common_meta::kv_backend::KvBackendRef; use common_stat::ResourceStatImpl; use common_telemetry::info; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; @@ -39,12 +43,13 @@ use flow::{ get_flow_auth_options, }; use meta_client::{MetaClientOptions, MetaClientType}; +use servers::configurator::GrpcBuilderConfiguratorRef; use snafu::{OptionExt, ResultExt, ensure}; use tracing_appender::non_blocking::WorkerGuard; use crate::error::{ BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu, - MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, + MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu, }; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile}; @@ -55,33 +60,14 @@ type FlownodeOptions = GreptimeOptions; pub struct Instance { flownode: FlownodeInstance, - - // The components of flownode, which make it easier to expand based - // on the components. - #[cfg(feature = "enterprise")] - components: Components, - // Keep the logging guard to prevent the worker from being dropped. _guard: Vec, } -#[cfg(feature = "enterprise")] -pub struct Components { - pub catalog_manager: catalog::CatalogManagerRef, - pub fe_client: Arc, - pub kv_backend: common_meta::kv_backend::KvBackendRef, -} - impl Instance { - pub fn new( - flownode: FlownodeInstance, - #[cfg(feature = "enterprise")] components: Components, - guard: Vec, - ) -> Self { + pub fn new(flownode: FlownodeInstance, guard: Vec) -> Self { Self { flownode, - #[cfg(feature = "enterprise")] - components, _guard: guard, } } @@ -94,11 +80,6 @@ impl Instance { pub fn flownode_mut(&mut self) -> &mut FlownodeInstance { &mut self.flownode } - - #[cfg(feature = "enterprise")] - pub fn components(&self) -> &Components { - &self.components - } } #[async_trait::async_trait] @@ -396,7 +377,7 @@ impl StartCommand { let frontend_client = Arc::new(frontend_client); let flownode_builder = FlownodeBuilder::new( opts.clone(), - plugins, + plugins.clone(), table_metadata_manager, catalog_manager.clone(), flow_metadata_manager, @@ -405,8 +386,29 @@ impl StartCommand { .with_heartbeat_task(heartbeat_task); let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?; + + let builder = + FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server()); + let builder = if let Some(configurator) = + plugins.get::>() + { + let context = GrpcConfigureContext { + kv_backend: cached_meta_backend.clone(), + fe_client: frontend_client.clone(), + flownode_id: member_id, + catalog_manager: catalog_manager.clone(), + }; + configurator + .configure(builder, context) + .await + .context(OtherSnafu)? + } else { + builder + }; + let grpc_server = builder.build(); + let services = FlownodeServiceBuilder::new(&opts) - .with_default_grpc_server(flownode.flownode_server()) + .with_grpc_server(grpc_server) .enable_http_service() .build() .context(StartFlownodeSnafu)?; @@ -430,16 +432,14 @@ impl StartCommand { .set_frontend_invoker(invoker) .await; - #[cfg(feature = "enterprise")] - let components = Components { - catalog_manager: catalog_manager.clone(), - fe_client: frontend_client, - kv_backend: cached_meta_backend, - }; - - #[cfg(not(feature = "enterprise"))] - return Ok(Instance::new(flownode, guard)); - #[cfg(feature = "enterprise")] - Ok(Instance::new(flownode, components, guard)) + Ok(Instance::new(flownode, guard)) } } + +/// The context for [`GrpcBuilderConfiguratorRef`] in flownode. +pub struct GrpcConfigureContext { + pub kv_backend: KvBackendRef, + pub fe_client: Arc, + pub flownode_id: FlownodeId, + pub catalog_manager: CatalogManagerRef, +} diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index c5162e5ac0..5be3ff897b 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -19,7 +20,10 @@ use std::time::Duration; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::information_extension::DistributedInformationExtension; -use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; +use catalog::kvbackend::{ + CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder, + MetaKvBackend, +}; use catalog::process_manager::ProcessManager; use clap::Parser; use client::client_manager::NodeClients; @@ -41,14 +45,14 @@ use frontend::frontend::Frontend; use frontend::heartbeat::HeartbeatTask; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; -use meta_client::{MetaClientOptions, MetaClientType}; +use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType}; use servers::addrs; use servers::grpc::GrpcOptions; use servers::tls::{TlsMode, TlsOption}; use snafu::{OptionExt, ResultExt}; use tracing_appender::non_blocking::WorkerGuard; -use crate::error::{self, Result}; +use crate::error::{self, OtherSnafu, Result}; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile}; @@ -416,9 +420,16 @@ impl StartCommand { layered_cache_registry.clone(), ) .with_process_manager(process_manager.clone()); - #[cfg(feature = "enterprise")] - let builder = if let Some(factories) = plugins.get() { - builder.with_extra_information_table_factories(factories) + let builder = if let Some(configurator) = + plugins.get::>() + { + let ctx = CatalogManagerConfigureContext { + meta_client: meta_client.clone(), + }; + configurator + .configure(builder, ctx) + .await + .context(OtherSnafu)? } else { builder }; @@ -471,6 +482,11 @@ impl StartCommand { } } +/// The context for [`CatalogManagerConfigratorRef`] in frontend. +pub struct CatalogManagerConfigureContext { + pub meta_client: MetaClientRef, +} + #[cfg(test)] mod tests { use std::io::Write; diff --git a/src/cmd/src/metasrv.rs b/src/cmd/src/metasrv.rs index 4f71775e74..ee67267de3 100644 --- a/src/cmd/src/metasrv.rs +++ b/src/cmd/src/metasrv.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::fmt; +use std::fmt::{self, Debug}; use std::path::Path; use std::time::Duration; @@ -23,7 +23,7 @@ use common_config::Configurable; use common_telemetry::info; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_version::{short_version, verbose_version}; -use meta_srv::bootstrap::MetasrvInstance; +use meta_srv::bootstrap::{MetasrvInstance, metasrv_builder}; use meta_srv::metasrv::BackendImpl; use snafu::ResultExt; use tracing_appender::non_blocking::WorkerGuard; @@ -177,7 +177,7 @@ pub struct StartCommand { backend: Option, } -impl fmt::Debug for StartCommand { +impl Debug for StartCommand { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("StartCommand") .field("rpc_bind_addr", &self.rpc_bind_addr) @@ -341,7 +341,7 @@ impl StartCommand { .await .context(StartMetaServerSnafu)?; - let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins, None) + let builder = metasrv_builder(&opts, plugins, None) .await .context(error::BuildMetaServerSnafu)?; let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?; diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 8d3741111f..4f7cec3552 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; @@ -20,7 +21,7 @@ use std::{fs, path}; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::information_schema::InformationExtensionRef; -use catalog::kvbackend::KvBackendCatalogManagerBuilder; +use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder}; use catalog::process_manager::ProcessManager; use clap::Parser; use common_base::Plugins; @@ -31,7 +32,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; -use common_meta::ddl_manager::DdlManager; +use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext}; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; @@ -63,7 +64,7 @@ use standalone::StandaloneInformationExtension; use standalone::options::StandaloneOptions; use tracing_appender::non_blocking::WorkerGuard; -use crate::error::{Result, StartFlownodeSnafu}; +use crate::error::{OtherSnafu, Result, StartFlownodeSnafu}; use crate::options::{GlobalOptions, GreptimeOptions}; use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile}; @@ -116,34 +117,15 @@ pub struct Instance { flownode: FlownodeInstance, procedure_manager: ProcedureManagerRef, wal_options_allocator: WalOptionsAllocatorRef, - - // The components of standalone, which make it easier to expand based - // on the components. - #[cfg(feature = "enterprise")] - components: Components, - // Keep the logging guard to prevent the worker from being dropped. _guard: Vec, } -#[cfg(feature = "enterprise")] -pub struct Components { - pub plugins: Plugins, - pub kv_backend: KvBackendRef, - pub frontend_client: Arc, - pub catalog_manager: catalog::CatalogManagerRef, -} - impl Instance { /// Find the socket addr of a server by its `name`. pub fn server_addr(&self, name: &str) -> Option { self.frontend.server_handlers().addr(name) } - - #[cfg(feature = "enterprise")] - pub fn components(&self) -> &Components { - &self.components - } } #[async_trait] @@ -415,6 +397,13 @@ impl StartCommand { plugins.insert::(information_extension.clone()); let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None)); + + // for standalone not use grpc, but get a handler to frontend grpc client without + // actually make a connection + let (frontend_client, frontend_instance_handler) = + FrontendClient::from_empty_grpc_handler(opts.query.clone()); + let frontend_client = Arc::new(frontend_client); + let builder = KvBackendCatalogManagerBuilder::new( information_extension.clone(), kv_backend.clone(), @@ -422,9 +411,16 @@ impl StartCommand { ) .with_procedure_manager(procedure_manager.clone()) .with_process_manager(process_manager.clone()); - #[cfg(feature = "enterprise")] - let builder = if let Some(factories) = plugins.get() { - builder.with_extra_information_table_factories(factories) + let builder = if let Some(configurator) = + plugins.get::>() + { + let ctx = CatalogManagerConfigureContext { + fe_client: frontend_client.clone(), + }; + configurator + .configure(builder, ctx) + .await + .context(OtherSnafu)? } else { builder }; @@ -439,11 +435,6 @@ impl StartCommand { ..Default::default() }; - // for standalone not use grpc, but get a handler to frontend grpc client without - // actually make a connection - let (frontend_client, frontend_instance_handler) = - FrontendClient::from_empty_grpc_handler(opts.query.clone()); - let frontend_client = Arc::new(frontend_client); let flow_builder = FlownodeBuilder::new( flownode_options, plugins.clone(), @@ -514,11 +505,17 @@ impl StartCommand { let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true) .context(error::InitDdlManagerSnafu)?; - #[cfg(feature = "enterprise")] - let ddl_manager = { - let trigger_ddl_manager: Option = - plugins.get(); - ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager) + + let ddl_manager = if let Some(configurator) = plugins.get::() { + let ctx = DdlManagerConfigureContext { + kv_backend: kv_backend.clone(), + }; + configurator + .configure(ddl_manager, ctx) + .await + .context(OtherSnafu)? + } else { + ddl_manager }; let procedure_executor = Arc::new(LocalProcedureExecutor::new( @@ -574,22 +571,12 @@ impl StartCommand { heartbeat_task: None, }; - #[cfg(feature = "enterprise")] - let components = Components { - plugins, - kv_backend, - frontend_client, - catalog_manager, - }; - Ok(Instance { datanode, frontend, flownode, procedure_manager, wal_options_allocator, - #[cfg(feature = "enterprise")] - components, _guard: guard, }) } @@ -608,6 +595,11 @@ impl StartCommand { } } +/// The context for [`CatalogManagerConfigratorRef`] in standalone. +pub struct CatalogManagerConfigureContext { + pub fe_client: Arc, +} + #[cfg(test)] mod tests { use std::default::Default; diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 9ade13052d..49d547a459 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use common_error::ext::BoxedError; use common_procedure::{ BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher, }; @@ -45,6 +46,7 @@ use crate::error::{ use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; +use crate::kv_backend::KvBackendRef; use crate::procedure_executor::ExecutorContext; #[cfg(feature = "enterprise")] use crate::rpc::ddl::DdlTask::CreateTrigger; @@ -66,6 +68,23 @@ use crate::rpc::ddl::{ }; use crate::rpc::router::RegionRoute; +/// A configurator that customizes or enhances a [`DdlManager`]. +#[async_trait::async_trait] +pub trait DdlManagerConfigurator: Send + Sync { + /// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`]. + async fn configure( + &self, + ddl_manager: DdlManager, + ctx: DdlManagerConfigureContext, + ) -> std::result::Result; +} + +pub type DdlManagerConfiguratorRef = Arc; + +pub struct DdlManagerConfigureContext { + pub kv_backend: KvBackendRef, +} + pub type DdlManagerRef = Arc; pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader; @@ -148,11 +167,8 @@ impl DdlManager { } #[cfg(feature = "enterprise")] - pub fn with_trigger_ddl_manager( - mut self, - trigger_ddl_manager: Option, - ) -> Self { - self.trigger_ddl_manager = trigger_ddl_manager; + pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self { + self.trigger_ddl_manager = Some(trigger_ddl_manager); self } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 7a32e0adcb..ff42ba53f2 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -32,15 +32,18 @@ use operator::flow::FlowServiceOperator; use operator::insert::Inserter; use operator::procedure::ProcedureServiceOperator; use operator::request::Requester; -use operator::statement::{StatementExecutor, StatementExecutorRef}; +use operator::statement::{ + ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef, + StatementExecutorRef, +}; use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; use pipeline::pipeline_operator::PipelineOperator; use query::QueryEngineFactory; use query::region_query::RegionQueryHandlerFactoryRef; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; -use crate::error::{self, Result}; +use crate::error::{self, ExternalSnafu, Result}; use crate::events::EventHandlerImpl; use crate::frontend::FrontendOptions; use crate::instance::Instance; @@ -187,10 +190,15 @@ impl FrontendBuilder { Some(process_manager.clone()), ); - #[cfg(feature = "enterprise")] let statement_executor = - if let Some(factory) = plugins.get::() { - statement_executor.with_trigger_querier(factory.create(kv_backend.clone())) + if let Some(configurator) = plugins.get::() { + let ctx = ExecutorConfigureContext { + kv_backend: kv_backend.clone(), + }; + configurator + .configure(statement_executor, ctx) + .await + .context(ExternalSnafu)? } else { statement_executor }; diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 9647d9a3ef..351853d2bf 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -29,7 +29,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_telemetry::info; use either::Either; -use servers::configurator::ConfiguratorRef; +use servers::configurator::GrpcRouterConfiguratorRef; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::server::Server; @@ -44,6 +44,7 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))] use crate::election::CANDIDATE_LEASE_SECS; use crate::election::etcd::EtcdElection; +use crate::error::OtherSnafu; use crate::metasrv::builder::MetasrvBuilder; use crate::metasrv::{ BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef, @@ -131,8 +132,15 @@ impl MetasrvInstance { // Start gRPC server with admin services for backward compatibility let mut router = router(self.metasrv.clone()); - if let Some(configurator) = self.metasrv.plugins().get::() { - router = configurator.config_grpc(router); + if let Some(configurator) = self + .metasrv + .plugins() + .get::>() + { + router = configurator + .configure_grpc_router(router, ()) + .await + .context(OtherSnafu)?; } let (serve_state_tx, serve_state_rx) = oneshot::channel(); diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 04b5bd02c6..be9e3a0fad 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -28,7 +28,7 @@ use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocato use common_meta::ddl::{ DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef, }; -use common_meta::ddl_manager::DdlManager; +use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext}; use common_meta::distributed_time_constants::{self}; use common_meta::key::TableMetadataManager; use common_meta::key::flow::FlowMetadataManager; @@ -54,7 +54,7 @@ use store_api::storage::MAX_REGION_SEQ; use crate::bootstrap::build_default_meta_peer_client; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::MetaPeerClientRef; -use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result}; +use crate::error::{self, BuildWalOptionsAllocatorSnafu, OtherSnafu, Result}; use crate::events::EventHandlerImpl; use crate::gc::GcScheduler; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; @@ -402,13 +402,22 @@ impl MetasrvBuilder { let procedure_manager_c = procedure_manager.clone(); let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true) .context(error::InitDdlManagerSnafu)?; - #[cfg(feature = "enterprise")] - let ddl_manager = { - let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| { - plugins.get::() - }); - ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager) + + let ddl_manager = if let Some(configurator) = plugins + .as_ref() + .and_then(|p| p.get::()) + { + let ctx = DdlManagerConfigureContext { + kv_backend: kv_backend.clone(), + }; + configurator + .configure(ddl_manager, ctx) + .await + .context(OtherSnafu)? + } else { + ddl_manager }; + let ddl_manager = Arc::new(ddl_manager); let region_flush_ticker = if is_remote_wal { diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index e4abf941c0..2faa4b0c48 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -88,6 +88,22 @@ use crate::insert::InserterRef; use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; use crate::statement::set::set_allow_query_fallback; +/// A configurator that customizes or enhances a [`StatementExecutor`]. +#[async_trait::async_trait] +pub trait StatementExecutorConfigurator: Send + Sync { + async fn configure( + &self, + executor: StatementExecutor, + ctx: ExecutorConfigureContext, + ) -> std::result::Result; +} + +pub type StatementExecutorConfiguratorRef = Arc; + +pub struct ExecutorConfigureContext { + pub kv_backend: KvBackendRef, +} + #[derive(Clone)] pub struct StatementExecutor { catalog_manager: CatalogManagerRef, @@ -106,15 +122,6 @@ pub struct StatementExecutor { pub type StatementExecutorRef = Arc; -/// Trait for creating [`TriggerQuerier`] instance. -#[cfg(feature = "enterprise")] -pub trait TriggerQuerierFactory: Send + Sync { - fn create(&self, kv_backend: KvBackendRef) -> TriggerQuerierRef; -} - -#[cfg(feature = "enterprise")] -pub type TriggerQuerierFactoryRef = Arc; - /// Trait for querying trigger info, such as `SHOW CREATE TRIGGER` etc. #[cfg(feature = "enterprise")] #[async_trait::async_trait] diff --git a/src/servers/src/configurator.rs b/src/servers/src/configurator.rs index 2b2e47d1e7..e8ba8264bd 100644 --- a/src/servers/src/configurator.rs +++ b/src/servers/src/configurator.rs @@ -15,16 +15,45 @@ use std::sync::Arc; use axum::Router as HttpRouter; +use common_error::ext::BoxedError; use tonic::transport::server::Router as GrpcRouter; -pub trait Configurator: Send + Sync { - fn config_http(&self, route: HttpRouter) -> HttpRouter { - route - } +use crate::grpc::builder::GrpcServerBuilder; - fn config_grpc(&self, route: GrpcRouter) -> GrpcRouter { - route - } +/// A configurator that customizes or enhances an HTTP router. +#[async_trait::async_trait] +pub trait HttpConfigurator: Send + Sync { + /// Configures the given HTTP router using the provided context. + async fn configure_http( + &self, + route: HttpRouter, + ctx: C, + ) -> std::result::Result; } -pub type ConfiguratorRef = Arc; +pub type HttpConfiguratorRef = Arc>; + +/// A configurator that customizes or enhances a gRPC router. +#[async_trait::async_trait] +pub trait GrpcRouterConfigurator: Send + Sync { + /// Configures the given gRPC router using the provided context. + async fn configure_grpc_router( + &self, + route: GrpcRouter, + ctx: C, + ) -> std::result::Result; +} + +pub type GrpcRouterConfiguratorRef = Arc>; + +/// A configurator that customizes or enhances a [`GrpcServerBuilder`]. +#[async_trait::async_trait] +pub trait GrpcBuilderConfigurator: Send + Sync { + async fn configure( + &self, + builder: GrpcServerBuilder, + ctx: C, + ) -> std::result::Result; +} + +pub type GrpcBuilderConfiguratorRef = Arc>; diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index d460c905f3..543054ae54 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -50,10 +50,11 @@ use tower_http::trace::TraceLayer; use self::authorize::AuthState; use self::result::table_result::TableResponse; -use crate::configurator::ConfiguratorRef; +use crate::configurator::HttpConfiguratorRef; use crate::elasticsearch; use crate::error::{ - AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result, + AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, + OtherSnafu, Result, }; use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2}; use crate::http::otlp::OtlpState; @@ -1205,8 +1206,11 @@ impl Server for HttpServer { ); let mut app = self.make_app(); - if let Some(configurator) = self.plugins.get::() { - app = configurator.config_http(app); + if let Some(configurator) = self.plugins.get::>() { + app = configurator + .configure_http(app, ()) + .await + .context(OtherSnafu)?; } let app = self.build(app)?; let listener = tokio::net::TcpListener::bind(listening)