diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 7381075f3e..6f35e74b65 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -125,6 +125,7 @@ pub struct Instance { flownode: FlownodeInstance, procedure_manager: ProcedureManagerRef, wal_provider: WalProviderRef, + leader_services_controller: Box, // Keep the logging guard to prevent the worker from being dropped. _guard: Vec, } @@ -157,15 +158,13 @@ impl App for Instance { async fn start(&mut self) -> Result<()> { self.datanode.start_telemetry(); - self.procedure_manager - .start() - .await - .context(error::StartProcedureManagerSnafu)?; - - self.wal_provider - .start() - .await - .context(error::StartWalProviderSnafu)?; + self.leader_services_controller + .start( + self.procedure_manager.clone(), + self.wal_provider.clone(), + self.datanode.region_server(), + ) + .await?; plugins::start_frontend_plugins(self.frontend.instance.plugins().clone()) .await @@ -187,10 +186,12 @@ impl App for Instance { .await .context(error::ShutdownFrontendSnafu)?; - self.procedure_manager - .stop() - .await - .context(error::StopProcedureManagerSnafu)?; + self.leader_services_controller + .stop( + self.procedure_manager.clone(), + self.datanode.region_server(), + ) + .await?; self.datanode .shutdown() @@ -420,6 +421,9 @@ impl StartCommand { let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone()); builder.with_cache_registry(layered_cache_registry.clone()); + if let Some(writable) = creator.open_regions_writable_override { + builder.with_open_regions_writable_override(writable); + } let datanode = builder.build().await.context(error::StartDatanodeSnafu)?; let information_extension = Arc::new(StandaloneInformationExtension::new( @@ -617,6 +621,7 @@ impl StartCommand { flownode, procedure_manager, wal_provider, + leader_services_controller: creator.leader_services_controller, _guard: vec![], }; let result = InstanceCreatorResult { @@ -642,7 +647,7 @@ impl StartCommand { } #[async_trait] -pub trait NodeManagerCreator { +pub trait NodeManagerCreator: Send + Sync { async fn create( &self, kv_backend: &KvBackendRef, @@ -688,7 +693,7 @@ impl MetadataKvBackendCreator for DefaultMetadataKvBackendCreator { } } -pub trait TableIdAllocatorCreator { +pub trait TableIdAllocatorCreator: Send + Sync { fn create(&self, kv_backend: &KvBackendRef) -> Arc; } @@ -706,7 +711,7 @@ impl TableIdAllocatorCreator for DefaultTableIdAllocatorCreator { } #[async_trait] -pub trait ProcedureExecutorCreator { +pub trait ProcedureExecutorCreator: Send + Sync { async fn create( &self, ddl_manager: DdlManagerRef, @@ -730,6 +735,59 @@ impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator { } } +#[async_trait] +pub trait StandaloneLeaderServicesController: Send + Sync { + /// Starts services that manage standalone metadata or WAL state. + /// + /// The default implementation starts the procedure manager and WAL provider + /// during instance startup. + async fn start( + &self, + procedure_manager: ProcedureManagerRef, + wal_provider: WalProviderRef, + region_server: RegionServer, + ) -> Result<()>; + + /// Stops services started by [`StandaloneLeaderServicesController::start`]. + async fn stop( + &self, + procedure_manager: ProcedureManagerRef, + region_server: RegionServer, + ) -> Result<()>; +} + +pub struct DefaultStandaloneLeaderServicesController; + +#[async_trait] +impl StandaloneLeaderServicesController for DefaultStandaloneLeaderServicesController { + async fn start( + &self, + procedure_manager: ProcedureManagerRef, + wal_provider: WalProviderRef, + _region_server: RegionServer, + ) -> Result<()> { + procedure_manager + .start() + .await + .context(error::StartProcedureManagerSnafu)?; + wal_provider + .start() + .await + .context(error::StartWalProviderSnafu) + } + + async fn stop( + &self, + procedure_manager: ProcedureManagerRef, + _region_server: RegionServer, + ) -> Result<()> { + procedure_manager + .stop() + .await + .context(error::StopProcedureManagerSnafu) + } +} + /// `InstanceCreator` is used for grouping various component creators for building the /// Standalone instance, suitable for customizing how the instance can be built. pub struct InstanceCreator { @@ -739,6 +797,8 @@ pub struct InstanceCreator { node_manager_creator: Box, table_id_allocator_creator: Box, procedure_executor_creator: Box, + leader_services_controller: Box, + open_regions_writable_override: Option, } impl InstanceCreator { @@ -752,6 +812,8 @@ impl InstanceCreator { node_manager_creator, table_id_allocator_creator, procedure_executor_creator, + leader_services_controller: Box::new(DefaultStandaloneLeaderServicesController), + open_regions_writable_override: None, } } @@ -762,6 +824,57 @@ impl InstanceCreator { self.metadata_kv_backend_creator = metadata_kv_backend_creator; self } + + /// Wraps the metadata backend creator while retaining the default creator. + /// + /// This is useful for callers that need to add runtime behavior around + /// metadata access without reimplementing backend selection. + pub fn map_metadata_kv_backend_creator(mut self, f: F) -> Self + where + F: FnOnce(Box) -> Box, + { + self.metadata_kv_backend_creator = f(self.metadata_kv_backend_creator); + self + } + + /// Wraps node-manager creation while preserving the selected standalone node manager. + pub fn map_node_manager_creator(mut self, f: F) -> Self + where + F: FnOnce(Box) -> Box, + { + self.node_manager_creator = f(self.node_manager_creator); + self + } + + /// Wraps procedure-executor creation while preserving the current setup. + pub fn map_procedure_executor_creator(mut self, f: F) -> Self + where + F: FnOnce(Box) -> Box, + { + self.procedure_executor_creator = f(self.procedure_executor_creator); + self + } + + /// Replaces startup/shutdown ownership for procedure manager and WAL provider. + pub fn with_leader_services_controller( + mut self, + leader_services_controller: Box, + ) -> Self { + self.leader_services_controller = leader_services_controller; + self + } + + /// Overrides whether regions opened during startup should become writable. + /// + /// `None` keeps the default startup behavior (regions open writable). + /// + /// Warning: setting this to `false` in standalone mode will leave reopened regions + /// permanently read-only. Standalone has no metasrv heartbeat or region-role + /// reconciliation, so there is no path to promote regions to Leader after startup. + pub fn with_open_regions_writable_override(mut self, writable: bool) -> Self { + self.open_regions_writable_override = Some(writable); + self + } } impl Default for InstanceCreator { @@ -771,6 +884,8 @@ impl Default for InstanceCreator { node_manager_creator: Box::new(DefaultNodeManagerCreator), table_id_allocator_creator: Box::new(DefaultTableIdAllocatorCreator), procedure_executor_creator: Box::new(DefaultProcedureExecutorCreator), + leader_services_controller: Box::new(DefaultStandaloneLeaderServicesController), + open_regions_writable_override: None, } } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index c848215d39..9a2fe3d982 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -163,6 +163,7 @@ pub struct DatanodeBuilder { kv_backend: KvBackendRef, cache_registry: Option>, topic_stats_reporter: Option>, + open_regions_writable_override: Option, #[cfg(feature = "enterprise")] extension_range_provider_factory: Option, } @@ -176,6 +177,7 @@ impl DatanodeBuilder { meta_client: None, kv_backend, cache_registry: None, + open_regions_writable_override: None, #[cfg(feature = "enterprise")] extension_range_provider_factory: None, topic_stats_reporter: None, @@ -205,6 +207,20 @@ impl DatanodeBuilder { self } + /// Overrides whether regions opened during datanode startup should become writable. + /// + /// When unset, the builder uses its default writable policy for reopened regions + /// (writable only when no metasrv client is configured). + /// + /// Warning: setting this to `true` on a metasrv-controlled datanode (one built + /// with `with_meta_client`) will promote regions to Leader before heartbeat and + /// lease coordination begin, bypassing the metasrv safety contract and creating a + /// potential split-brain window during startup. + pub fn with_open_regions_writable_override(&mut self, writable: bool) -> &mut Self { + self.open_regions_writable_override = Some(writable); + self + } + #[cfg(feature = "enterprise")] pub fn with_extension_range_provider( &mut self, @@ -274,10 +290,13 @@ impl DatanodeBuilder { let region_open_requests = build_region_open_requests(node_id, self.kv_backend.clone()).await?; + let open_with_writable = self + .open_regions_writable_override + .unwrap_or(!controlled_by_metasrv); let open_all_regions = open_all_regions( region_server.clone(), region_open_requests, - !controlled_by_metasrv, + open_with_writable, self.opts.init_regions_parallelism, // Ignore nonexistent regions in recovery mode. is_recovery_mode,