diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 4146e54e56..9703bff640 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -30,20 +30,16 @@ use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, Configurable, KvBackendConfig}; use common_error::ext::BoxedError; use common_meta::cache::LayeredCacheRegistryBuilder; -use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::cluster::{NodeInfo, NodeStatus}; use common_meta::datanode::RegionStat; -use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; -use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; +use common_meta::ddl::flow_meta::FlowMetadataAllocator; +use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef}; use common_meta::ddl_manager::DdlManager; -#[cfg(feature = "enterprise")] -use common_meta::ddl_manager::TriggerDdlManagerRef; use common_meta::key::flow::flow_state::FlowStat; -use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef}; +use common_meta::key::flow::FlowMetadataManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; -use common_meta::node_manager::NodeManagerRef; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; @@ -594,28 +590,36 @@ impl StartCommand { .await .context(error::BuildWalOptionsAllocatorSnafu)?; let wal_options_allocator = Arc::new(wal_options_allocator); - let table_meta_allocator = Arc::new(TableMetadataAllocator::new( + let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( table_id_sequence, wal_options_allocator.clone(), )); - let flow_meta_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( + let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( flow_id_sequence, )); + let ddl_context = DdlContext { + node_manager: node_manager.clone(), + cache_invalidator: layered_cache_registry.clone(), + memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), + leader_region_registry: Arc::new(LeaderRegionRegistry::default()), + table_metadata_manager: table_metadata_manager.clone(), + table_metadata_allocator: table_metadata_allocator.clone(), + flow_metadata_manager: flow_metadata_manager.clone(), + flow_metadata_allocator: flow_metadata_allocator.clone(), + region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), + }; + let procedure_manager_c = procedure_manager.clone(); + + let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true) + .context(error::InitDdlManagerSnafu)?; #[cfg(feature = "enterprise")] - let trigger_ddl_manager: Option = plugins.get(); - let ddl_task_executor = Self::create_ddl_task_executor( - procedure_manager.clone(), - node_manager.clone(), - layered_cache_registry.clone(), - table_metadata_manager, - table_meta_allocator, - flow_metadata_manager, - flow_meta_allocator, - #[cfg(feature = "enterprise")] - trigger_ddl_manager, - ) - .await?; + let ddl_manager = { + let trigger_ddl_manager: Option = + plugins.get(); + ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager) + }; + let ddl_task_executor: ProcedureExecutorRef = Arc::new(ddl_manager); let fe_instance = FrontendBuilder::new( fe_opts.clone(), @@ -679,41 +683,6 @@ impl StartCommand { }) } - #[allow(clippy::too_many_arguments)] - pub async fn create_ddl_task_executor( - procedure_manager: ProcedureManagerRef, - node_manager: NodeManagerRef, - cache_invalidator: CacheInvalidatorRef, - table_metadata_manager: TableMetadataManagerRef, - table_metadata_allocator: TableMetadataAllocatorRef, - flow_metadata_manager: FlowMetadataManagerRef, - flow_metadata_allocator: FlowMetadataAllocatorRef, - #[cfg(feature = "enterprise")] trigger_ddl_manager: Option, - ) -> Result { - let procedure_executor: ProcedureExecutorRef = Arc::new( - DdlManager::try_new( - DdlContext { - node_manager, - cache_invalidator, - memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), - leader_region_registry: Arc::new(LeaderRegionRegistry::default()), - table_metadata_manager, - table_metadata_allocator, - flow_metadata_manager, - flow_metadata_allocator, - region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), - }, - procedure_manager, - true, - #[cfg(feature = "enterprise")] - trigger_ddl_manager, - ) - .context(error::InitDdlManagerSnafu)?, - ); - - Ok(procedure_executor) - } - pub async fn create_table_metadata_manager( kv_backend: KvBackendRef, ) -> Result { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index abd739ae4a..f665796713 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -125,13 +125,12 @@ impl DdlManager { ddl_context: DdlContext, procedure_manager: ProcedureManagerRef, register_loaders: bool, - #[cfg(feature = "enterprise")] trigger_ddl_manager: Option, ) -> Result { let manager = Self { ddl_context, procedure_manager, #[cfg(feature = "enterprise")] - trigger_ddl_manager, + trigger_ddl_manager: None, }; if register_loaders { manager.register_loaders()?; @@ -139,6 +138,15 @@ impl DdlManager { Ok(manager) } + #[cfg(feature = "enterprise")] + pub fn with_trigger_ddl_manager( + mut self, + trigger_ddl_manager: Option, + ) -> Self { + self.trigger_ddl_manager = trigger_ddl_manager; + self + } + /// Returns the [TableMetadataManagerRef]. pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef { &self.ddl_context.table_metadata_manager @@ -964,8 +972,6 @@ mod tests { }, procedure_manager.clone(), true, - #[cfg(feature = "enterprise")] - None, ); let expected_loaders = vec![ diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 167c5afd8e..c9c1b8dbdf 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -353,30 +353,28 @@ impl MetasrvBuilder { let leader_region_registry = Arc::new(LeaderRegionRegistry::default()); + let ddl_context = DdlContext { + node_manager, + cache_invalidator: cache_invalidator.clone(), + memory_region_keeper: memory_region_keeper.clone(), + leader_region_registry: leader_region_registry.clone(), + table_metadata_manager: table_metadata_manager.clone(), + table_metadata_allocator: table_metadata_allocator.clone(), + flow_metadata_manager: flow_metadata_manager.clone(), + flow_metadata_allocator: flow_metadata_allocator.clone(), + region_failure_detector_controller, + }; + let procedure_manager_c = procedure_manager.clone(); + let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true) + .context(error::InitDdlManagerSnafu)?; #[cfg(feature = "enterprise")] - let trigger_ddl_manager = plugins - .as_ref() - .and_then(|plugins| plugins.get::()); - let ddl_manager = Arc::new( - DdlManager::try_new( - DdlContext { - node_manager, - cache_invalidator: cache_invalidator.clone(), - memory_region_keeper: memory_region_keeper.clone(), - leader_region_registry: leader_region_registry.clone(), - table_metadata_manager: table_metadata_manager.clone(), - table_metadata_allocator: table_metadata_allocator.clone(), - flow_metadata_manager: flow_metadata_manager.clone(), - flow_metadata_allocator: flow_metadata_allocator.clone(), - region_failure_detector_controller, - }, - procedure_manager.clone(), - true, - #[cfg(feature = "enterprise")] - trigger_ddl_manager, - ) - .context(error::InitDdlManagerSnafu)?, - ); + let ddl_manager = { + let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| { + plugins.get::() + }); + ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager) + }; + let ddl_manager = Arc::new(ddl_manager); // remote WAL prune ticker and manager let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index f576206cc0..f2f538f528 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -6,14 +6,6 @@ license.workspace = true [features] dashboard = [] -enterprise = [ - "cmd/enterprise", - "common-meta/enterprise", - "frontend/enterprise", - "meta-srv/enterprise", - "operator/enterprise", - "sql/enterprise", -] [lints] workspace = true diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 9935bce716..e14b08e15e 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -228,8 +228,6 @@ impl GreptimeDbStandaloneBuilder { }, procedure_manager.clone(), register_procedure_loaders, - #[cfg(feature = "enterprise")] - None, ) .unwrap(), );