From 74ff5c37eaf45484d10f702b25e2aded92aa6eba Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Fri, 13 Mar 2026 17:25:21 +0800 Subject: [PATCH] refactor: customize standalone instance build (#7807) * refactor: customize standalone instance build Signed-off-by: luofucong * resolve PR comments Signed-off-by: luofucong --------- Signed-off-by: luofucong --- src/cmd/src/standalone.rs | 186 ++++++++++++++++++++--- tests/conf/datanode-test.toml.template | 2 +- tests/conf/frontend-test.toml.template | 4 +- tests/conf/standalone-test.toml.template | 6 +- tests/runner/src/server_mode.rs | 63 +++----- 5 files changed, 187 insertions(+), 74 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 92638d3c4a..215bea0ec5 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -32,14 +32,15 @@ use common_meta::cache::LayeredCacheRegistryBuilder; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl}; -use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef}; +use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerRef}; use common_meta::key::flow::FlowMetadataManager; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; -use common_meta::procedure_executor::LocalProcedureExecutor; +use common_meta::node_manager::{FlownodeRef, NodeManagerRef}; +use common_meta::procedure_executor::{LocalProcedureExecutor, ProcedureExecutorRef}; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; -use common_meta::sequence::SequenceBuilder; +use common_meta::sequence::{Sequence, SequenceBuilder}; use common_meta::wal_provider::{WalProviderRef, build_wal_provider}; use common_procedure::ProcedureManagerRef; use common_query::prelude::set_default_prefix; @@ -49,6 +50,7 @@ use common_time::timezone::set_default_timezone; use common_version::{short_version, verbose_version}; use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; +use datanode::region_server::RegionServer; use flow::{ FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker, GrpcQueryHandlerWithBoxedError, @@ -58,6 +60,7 @@ use frontend::instance::StandaloneDatanodeManager; use frontend::instance::builder::FrontendBuilder; use frontend::server::Services; use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ}; +use plugins::PluginOptions; use plugins::frontend::context::{ CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext, }; @@ -130,6 +133,18 @@ impl Instance { pub fn server_addr(&self, name: &str) -> Option { self.frontend.server_handlers().addr(name) } + + /// Get the mutable Frontend component of this Standalone instance for externally modification + /// by others (might not be in this code base, so don't delete this function). + pub fn mut_frontend(&mut self) -> &mut Frontend { + &mut self.frontend + } + + /// Get the Datanode component of this Standalone instance for externally usage + /// by others (might not be in this code base, so don't delete this function). + pub fn datanode(&self) -> &Datanode { + &self.datanode + } } #[async_trait] @@ -342,9 +357,18 @@ impl StartCommand { info!("Standalone start command: {:#?}", self); info!("Standalone options: {opts:#?}"); + let (mut instance, _) = + Self::build_with(opts.component, opts.plugins, InstanceCreator::default()).await?; + instance._guard.extend(guard); + Ok(instance) + } + + pub async fn build_with( + mut opts: StandaloneOptions, + plugin_opts: Vec, + creator: InstanceCreator, + ) -> Result<(Instance, InstanceCreatorResult)> { let mut plugins = Plugins::new(); - let plugin_opts = opts.plugins; - let mut opts = opts.component; set_default_prefix(opts.default_column_prefix.as_deref()) .map_err(BoxedError::new) .context(error::BuildCliSnafu)?; @@ -462,17 +486,16 @@ impl StartCommand { .await; } - let node_manager = Arc::new(StandaloneDatanodeManager { - region_server: datanode.region_server(), - flow_server: flownode.flow_engine(), - }); + let node_manager = creator + .node_manager_creator + .create( + &kv_backend, + datanode.region_server(), + flownode.flow_engine(), + ) + .await?; - let table_id_allocator = Arc::new( - SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) - .initial(MIN_USER_TABLE_ID as u64) - .step(10) - .build(), - ); + let table_id_allocator = creator.table_id_allocator_creator.create(&kv_backend); let flow_id_sequence = Arc::new( SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone()) .initial(MIN_USER_FLOW_ID as u64) @@ -489,7 +512,7 @@ impl StartCommand { .context(error::BuildWalProviderSnafu)?; let wal_provider = Arc::new(wal_provider); let table_metadata_allocator = Arc::new(TableMetadataAllocator::new( - table_id_allocator, + table_id_allocator.clone(), wal_provider.clone(), )); let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator( @@ -532,10 +555,10 @@ impl StartCommand { ddl_manager }; - let procedure_executor = Arc::new(LocalProcedureExecutor::new( - Arc::new(ddl_manager), - procedure_manager.clone(), - )); + let procedure_executor = creator + .procedure_executor_creator + .create(Arc::new(ddl_manager), procedure_manager.clone()) + .await?; let fe_instance = FrontendBuilder::new( fe_opts.clone(), @@ -568,7 +591,7 @@ impl StartCommand { kv_backend.clone(), layered_cache_registry.clone(), procedure_executor, - node_manager, + node_manager.clone(), ) .await .context(StartFlownodeSnafu)?; @@ -584,14 +607,20 @@ impl StartCommand { heartbeat_task: None, }; - Ok(Instance { + let instance = Instance { datanode, frontend, flownode, procedure_manager, wal_provider, - _guard: guard, - }) + _guard: vec![], + }; + let result = InstanceCreatorResult { + kv_backend, + node_manager, + table_id_allocator, + }; + Ok((instance, result)) } pub async fn create_table_metadata_manager( @@ -608,6 +637,115 @@ impl StartCommand { } } +#[async_trait] +pub trait NodeManagerCreator { + async fn create( + &self, + kv_backend: &KvBackendRef, + region_server: RegionServer, + flow_server: FlownodeRef, + ) -> Result; +} + +pub struct DefaultNodeManagerCreator; + +#[async_trait] +impl NodeManagerCreator for DefaultNodeManagerCreator { + async fn create( + &self, + _: &KvBackendRef, + region_server: RegionServer, + flow_server: FlownodeRef, + ) -> Result { + Ok(Arc::new(StandaloneDatanodeManager { + region_server, + flow_server, + })) + } +} + +pub trait TableIdAllocatorCreator { + fn create(&self, kv_backend: &KvBackendRef) -> Arc; +} + +struct DefaultTableIdAllocatorCreator; + +impl TableIdAllocatorCreator for DefaultTableIdAllocatorCreator { + fn create(&self, kv_backend: &KvBackendRef) -> Arc { + Arc::new( + SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) + .initial(MIN_USER_TABLE_ID as u64) + .step(10) + .build(), + ) + } +} + +#[async_trait] +pub trait ProcedureExecutorCreator { + async fn create( + &self, + ddl_manager: DdlManagerRef, + procedure_manager: ProcedureManagerRef, + ) -> Result; +} + +pub struct DefaultProcedureExecutorCreator; + +#[async_trait] +impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator { + async fn create( + &self, + ddl_manager: DdlManagerRef, + procedure_manager: ProcedureManagerRef, + ) -> Result { + Ok(Arc::new(LocalProcedureExecutor::new( + ddl_manager, + procedure_manager, + ))) + } +} + +/// `InstanceCreator` is used for grouping various component creators for building the +/// Standalone instance, suitable for customizing how the instance can be built. +pub struct InstanceCreator { + node_manager_creator: Box, + table_id_allocator_creator: Box, + procedure_executor_creator: Box, +} + +impl InstanceCreator { + pub fn new( + node_manager_creator: Box, + table_id_allocator_creator: Box, + procedure_executor_creator: Box, + ) -> Self { + Self { + node_manager_creator, + table_id_allocator_creator, + procedure_executor_creator, + } + } +} + +impl Default for InstanceCreator { + fn default() -> Self { + Self { + node_manager_creator: Box::new(DefaultNodeManagerCreator), + table_id_allocator_creator: Box::new(DefaultTableIdAllocatorCreator), + procedure_executor_creator: Box::new(DefaultProcedureExecutorCreator), + } + } +} + +/// `InstanceCreatorResult` is expected to be used paired with [InstanceCreator]. +/// It stores the created and other important components for further reusing. +pub struct InstanceCreatorResult { + pub kv_backend: KvBackendRef, + pub node_manager: NodeManagerRef, + pub table_id_allocator: Arc, +} + #[cfg(test)] mod tests { use std::default::Default; diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index 4cb0423c72..3ec8a2f695 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -28,7 +28,7 @@ type = 'File' data_home = '{data_home}' [meta_client_options] -metasrv_addrs = ['{metasrv_addr}'] +metasrv_addrs = ['{addrs.metasrv_addr}'] timeout_millis = 3000 connect_timeout_millis = 5000 tcp_nodelay = false diff --git a/tests/conf/frontend-test.toml.template b/tests/conf/frontend-test.toml.template index de4ce86adc..25d44ff6e4 100644 --- a/tests/conf/frontend-test.toml.template +++ b/tests/conf/frontend-test.toml.template @@ -1,3 +1,3 @@ [grpc] -bind_addr = "{grpc_addr}" -server_addr = "{grpc_addr}" +bind_addr = "{addrs.grpc_addr}" +server_addr = "{addrs.grpc_addr}" diff --git a/tests/conf/standalone-test.toml.template b/tests/conf/standalone-test.toml.template index 509eac7ca6..50c014e991 100644 --- a/tests/conf/standalone-test.toml.template +++ b/tests/conf/standalone-test.toml.template @@ -26,12 +26,12 @@ type = 'File' data_home = '{data_home}' [grpc] -bind_addr = '{grpc_addr}' +bind_addr = '{addrs.grpc_addr}' runtime_size = 8 [mysql] enable = true -addr = "{mysql_addr}" +addr = "{addrs.mysql_addr}" runtime_size = 2 prepared_stmt_cache_size= 10000 @@ -40,7 +40,7 @@ mode = "disable" [postgres] enable = true -addr = "{postgres_addr}" +addr = "{addrs.postgres_addr}" runtime_size = 2 [procedure] diff --git a/tests/runner/src/server_mode.rs b/tests/runner/src/server_mode.rs index 172baf32ff..1f7cb72bf4 100644 --- a/tests/runner/src/server_mode.rs +++ b/tests/runner/src/server_mode.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::Path; use std::sync::{Mutex, OnceLock}; @@ -96,15 +96,7 @@ struct ConfigContext { use_etcd: bool, store_addrs: String, instance_id: usize, - // for following addrs, leave it empty if not needed - // required for datanode - metasrv_addr: String, - // for frontend and standalone - grpc_addr: String, - // for standalone - mysql_addr: String, - // for standalone - postgres_addr: String, + addrs: HashMap, // enable flat format for storage engine enable_flat_format: bool, } @@ -275,40 +267,26 @@ impl ServerMode { let procedure_dir = data_home.join("procedure").display().to_string(); // Get the required addresses based on server mode - let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self { + let addrs: HashMap = match self { ServerMode::Standalone { rpc_bind_addr, mysql_addr, postgres_addr, - .. - } => ( - String::new(), - rpc_bind_addr.clone(), - mysql_addr.clone(), - postgres_addr.clone(), - ), - ServerMode::Frontend { - rpc_bind_addr, - mysql_addr, - postgres_addr, - .. - } => ( - String::new(), - rpc_bind_addr.clone(), - mysql_addr.clone(), - postgres_addr.clone(), - ), - ServerMode::Datanode { - rpc_bind_addr, - metasrv_addr, - .. - } => ( - metasrv_addr.clone(), - rpc_bind_addr.clone(), - String::new(), - String::new(), - ), - _ => (String::new(), String::new(), String::new(), String::new()), + http_addr, + } => [ + ("http_addr".to_string(), http_addr.clone()), + ("grpc_addr".to_string(), rpc_bind_addr.clone()), + ("mysql_addr".to_string(), mysql_addr.clone()), + ("postgres_addr".to_string(), postgres_addr.clone()), + ] + .into(), + ServerMode::Frontend { rpc_bind_addr, .. } => { + [("grpc_addr".to_string(), rpc_bind_addr.clone())].into() + } + ServerMode::Datanode { metasrv_addr, .. } => { + [("metasrv_addr".to_string(), metasrv_addr.clone())].into() + } + _ => HashMap::new(), }; let ctx = ConfigContext { @@ -326,10 +304,7 @@ impl ServerMode { .collect::>() .join(","), instance_id: id, - metasrv_addr, - grpc_addr, - mysql_addr, - postgres_addr, + addrs, enable_flat_format: db_ctx.store_config().enable_flat_format, };