chore: wrap standalone runtime with trait (#8083)

* chore: introduce standalone start service trait

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add region server to the trait

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add comments

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-05-11 18:06:29 +08:00
committed by GitHub
parent dc5fab93a8
commit 7279e48e22
2 changed files with 151 additions and 17 deletions

View File

@@ -125,6 +125,7 @@ pub struct Instance {
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
leader_services_controller: Box<dyn StandaloneLeaderServicesController>,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
@@ -157,15 +158,13 @@ impl App for Instance {
async fn start(&mut self) -> Result<()> {
self.datanode.start_telemetry();
self.procedure_manager
.start()
.await
.context(error::StartProcedureManagerSnafu)?;
self.wal_provider
.start()
.await
.context(error::StartWalProviderSnafu)?;
self.leader_services_controller
.start(
self.procedure_manager.clone(),
self.wal_provider.clone(),
self.datanode.region_server(),
)
.await?;
plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
.await
@@ -187,10 +186,12 @@ impl App for Instance {
.await
.context(error::ShutdownFrontendSnafu)?;
self.procedure_manager
.stop()
.await
.context(error::StopProcedureManagerSnafu)?;
self.leader_services_controller
.stop(
self.procedure_manager.clone(),
self.datanode.region_server(),
)
.await?;
self.datanode
.shutdown()
@@ -420,6 +421,9 @@ impl StartCommand {
let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
builder.with_cache_registry(layered_cache_registry.clone());
if let Some(writable) = creator.open_regions_writable_override {
builder.with_open_regions_writable_override(writable);
}
let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
let information_extension = Arc::new(StandaloneInformationExtension::new(
@@ -617,6 +621,7 @@ impl StartCommand {
flownode,
procedure_manager,
wal_provider,
leader_services_controller: creator.leader_services_controller,
_guard: vec![],
};
let result = InstanceCreatorResult {
@@ -642,7 +647,7 @@ impl StartCommand {
}
#[async_trait]
pub trait NodeManagerCreator {
pub trait NodeManagerCreator: Send + Sync {
async fn create(
&self,
kv_backend: &KvBackendRef,
@@ -688,7 +693,7 @@ impl MetadataKvBackendCreator for DefaultMetadataKvBackendCreator {
}
}
pub trait TableIdAllocatorCreator {
pub trait TableIdAllocatorCreator: Send + Sync {
fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence>;
}
@@ -706,7 +711,7 @@ impl TableIdAllocatorCreator for DefaultTableIdAllocatorCreator {
}
#[async_trait]
pub trait ProcedureExecutorCreator {
pub trait ProcedureExecutorCreator: Send + Sync {
async fn create(
&self,
ddl_manager: DdlManagerRef,
@@ -730,6 +735,59 @@ impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator {
}
}
#[async_trait]
pub trait StandaloneLeaderServicesController: Send + Sync {
/// Starts services that manage standalone metadata or WAL state.
///
/// The default implementation starts the procedure manager and WAL provider
/// during instance startup.
async fn start(
&self,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
region_server: RegionServer,
) -> Result<()>;
/// Stops services started by [`StandaloneLeaderServicesController::start`].
async fn stop(
&self,
procedure_manager: ProcedureManagerRef,
region_server: RegionServer,
) -> Result<()>;
}
pub struct DefaultStandaloneLeaderServicesController;
#[async_trait]
impl StandaloneLeaderServicesController for DefaultStandaloneLeaderServicesController {
async fn start(
&self,
procedure_manager: ProcedureManagerRef,
wal_provider: WalProviderRef,
_region_server: RegionServer,
) -> Result<()> {
procedure_manager
.start()
.await
.context(error::StartProcedureManagerSnafu)?;
wal_provider
.start()
.await
.context(error::StartWalProviderSnafu)
}
async fn stop(
&self,
procedure_manager: ProcedureManagerRef,
_region_server: RegionServer,
) -> Result<()> {
procedure_manager
.stop()
.await
.context(error::StopProcedureManagerSnafu)
}
}
/// `InstanceCreator` is used for grouping various component creators for building the
/// Standalone instance, suitable for customizing how the instance can be built.
pub struct InstanceCreator {
@@ -739,6 +797,8 @@ pub struct InstanceCreator {
node_manager_creator: Box<dyn NodeManagerCreator>,
table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
leader_services_controller: Box<dyn StandaloneLeaderServicesController>,
open_regions_writable_override: Option<bool>,
}
impl InstanceCreator {
@@ -752,6 +812,8 @@ impl InstanceCreator {
node_manager_creator,
table_id_allocator_creator,
procedure_executor_creator,
leader_services_controller: Box::new(DefaultStandaloneLeaderServicesController),
open_regions_writable_override: None,
}
}
@@ -762,6 +824,57 @@ impl InstanceCreator {
self.metadata_kv_backend_creator = metadata_kv_backend_creator;
self
}
/// Wraps the metadata backend creator while retaining the default creator.
///
/// This is useful for callers that need to add runtime behavior around
/// metadata access without reimplementing backend selection.
pub fn map_metadata_kv_backend_creator<F>(mut self, f: F) -> Self
where
F: FnOnce(Box<dyn MetadataKvBackendCreator>) -> Box<dyn MetadataKvBackendCreator>,
{
self.metadata_kv_backend_creator = f(self.metadata_kv_backend_creator);
self
}
/// Wraps node-manager creation while preserving the selected standalone node manager.
pub fn map_node_manager_creator<F>(mut self, f: F) -> Self
where
F: FnOnce(Box<dyn NodeManagerCreator>) -> Box<dyn NodeManagerCreator>,
{
self.node_manager_creator = f(self.node_manager_creator);
self
}
/// Wraps procedure-executor creation while preserving the current setup.
pub fn map_procedure_executor_creator<F>(mut self, f: F) -> Self
where
F: FnOnce(Box<dyn ProcedureExecutorCreator>) -> Box<dyn ProcedureExecutorCreator>,
{
self.procedure_executor_creator = f(self.procedure_executor_creator);
self
}
/// Replaces startup/shutdown ownership for procedure manager and WAL provider.
pub fn with_leader_services_controller(
mut self,
leader_services_controller: Box<dyn StandaloneLeaderServicesController>,
) -> Self {
self.leader_services_controller = leader_services_controller;
self
}
/// Overrides whether regions opened during startup should become writable.
///
/// `None` keeps the default startup behavior (regions open writable).
///
/// Warning: setting this to `false` in standalone mode will leave reopened regions
/// permanently read-only. Standalone has no metasrv heartbeat or region-role
/// reconciliation, so there is no path to promote regions to Leader after startup.
pub fn with_open_regions_writable_override(mut self, writable: bool) -> Self {
self.open_regions_writable_override = Some(writable);
self
}
}
impl Default for InstanceCreator {
@@ -771,6 +884,8 @@ impl Default for InstanceCreator {
node_manager_creator: Box::new(DefaultNodeManagerCreator),
table_id_allocator_creator: Box::new(DefaultTableIdAllocatorCreator),
procedure_executor_creator: Box::new(DefaultProcedureExecutorCreator),
leader_services_controller: Box::new(DefaultStandaloneLeaderServicesController),
open_regions_writable_override: None,
}
}
}

View File

@@ -163,6 +163,7 @@ pub struct DatanodeBuilder {
kv_backend: KvBackendRef,
cache_registry: Option<Arc<LayeredCacheRegistry>>,
topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
open_regions_writable_override: Option<bool>,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
}
@@ -176,6 +177,7 @@ impl DatanodeBuilder {
meta_client: None,
kv_backend,
cache_registry: None,
open_regions_writable_override: None,
#[cfg(feature = "enterprise")]
extension_range_provider_factory: None,
topic_stats_reporter: None,
@@ -205,6 +207,20 @@ impl DatanodeBuilder {
self
}
/// Overrides whether regions opened during datanode startup should become writable.
///
/// When unset, the builder uses its default writable policy for reopened regions
/// (writable only when no metasrv client is configured).
///
/// Warning: setting this to `true` on a metasrv-controlled datanode (one built
/// with `with_meta_client`) will promote regions to Leader before heartbeat and
/// lease coordination begin, bypassing the metasrv safety contract and creating a
/// potential split-brain window during startup.
pub fn with_open_regions_writable_override(&mut self, writable: bool) -> &mut Self {
self.open_regions_writable_override = Some(writable);
self
}
#[cfg(feature = "enterprise")]
pub fn with_extension_range_provider(
&mut self,
@@ -274,10 +290,13 @@ impl DatanodeBuilder {
let region_open_requests =
build_region_open_requests(node_id, self.kv_backend.clone()).await?;
let open_with_writable = self
.open_regions_writable_override
.unwrap_or(!controlled_by_metasrv);
let open_all_regions = open_all_regions(
region_server.clone(),
region_open_requests,
!controlled_by_metasrv,
open_with_writable,
self.opts.init_regions_parallelism,
// Ignore nonexistent regions in recovery mode.
is_recovery_mode,