refactor: customize standalone instance build (#7807)

* refactor: customize standalone instance build

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2026-03-13 17:25:21 +08:00
committed by GitHub
parent 20f38d8a6a
commit 74ff5c37ea
5 changed files with 187 additions and 74 deletions

View File

@@ -32,14 +32,15 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerRef};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::procedure_executor::LocalProcedureExecutor;
use common_meta::node_manager::{FlownodeRef, NodeManagerRef};
use common_meta::procedure_executor::{LocalProcedureExecutor, ProcedureExecutorRef};
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
use common_meta::sequence::{Sequence, SequenceBuilder};
use common_meta::wal_provider::{WalProviderRef, build_wal_provider};
use common_procedure::ProcedureManagerRef;
use common_query::prelude::set_default_prefix;
@@ -49,6 +50,7 @@ use common_time::timezone::set_default_timezone;
use common_version::{short_version, verbose_version};
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
use datanode::region_server::RegionServer;
use flow::{
FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
GrpcQueryHandlerWithBoxedError,
@@ -58,6 +60,7 @@ use frontend::instance::StandaloneDatanodeManager;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use plugins::PluginOptions;
use plugins::frontend::context::{
CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
};
@@ -130,6 +133,18 @@ impl Instance {
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name)
}
/// Get the mutable Frontend component of this Standalone instance for externally modification
/// by others (might not be in this code base, so don't delete this function).
pub fn mut_frontend(&mut self) -> &mut Frontend {
&mut self.frontend
}
/// Get the Datanode component of this Standalone instance for externally usage
/// by others (might not be in this code base, so don't delete this function).
pub fn datanode(&self) -> &Datanode {
&self.datanode
}
}
#[async_trait]
@@ -342,9 +357,18 @@ impl StartCommand {
info!("Standalone start command: {:#?}", self);
info!("Standalone options: {opts:#?}");
let (mut instance, _) =
Self::build_with(opts.component, opts.plugins, InstanceCreator::default()).await?;
instance._guard.extend(guard);
Ok(instance)
}
pub async fn build_with(
mut opts: StandaloneOptions,
plugin_opts: Vec<PluginOptions>,
creator: InstanceCreator,
) -> Result<(Instance, InstanceCreatorResult)> {
let mut plugins = Plugins::new();
let plugin_opts = opts.plugins;
let mut opts = opts.component;
set_default_prefix(opts.default_column_prefix.as_deref())
.map_err(BoxedError::new)
.context(error::BuildCliSnafu)?;
@@ -462,17 +486,16 @@ impl StartCommand {
.await;
}
let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
flow_server: flownode.flow_engine(),
});
let node_manager = creator
.node_manager_creator
.create(
&kv_backend,
datanode.region_server(),
flownode.flow_engine(),
)
.await?;
let table_id_allocator = Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
.build(),
);
let table_id_allocator = creator.table_id_allocator_creator.create(&kv_backend);
let flow_id_sequence = Arc::new(
SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_FLOW_ID as u64)
@@ -489,7 +512,7 @@ impl StartCommand {
.context(error::BuildWalProviderSnafu)?;
let wal_provider = Arc::new(wal_provider);
let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
table_id_allocator,
table_id_allocator.clone(),
wal_provider.clone(),
));
let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
@@ -532,10 +555,10 @@ impl StartCommand {
ddl_manager
};
let procedure_executor = Arc::new(LocalProcedureExecutor::new(
Arc::new(ddl_manager),
procedure_manager.clone(),
));
let procedure_executor = creator
.procedure_executor_creator
.create(Arc::new(ddl_manager), procedure_manager.clone())
.await?;
let fe_instance = FrontendBuilder::new(
fe_opts.clone(),
@@ -568,7 +591,7 @@ impl StartCommand {
kv_backend.clone(),
layered_cache_registry.clone(),
procedure_executor,
node_manager,
node_manager.clone(),
)
.await
.context(StartFlownodeSnafu)?;
@@ -584,14 +607,20 @@ impl StartCommand {
heartbeat_task: None,
};
Ok(Instance {
let instance = Instance {
datanode,
frontend,
flownode,
procedure_manager,
wal_provider,
_guard: guard,
})
_guard: vec![],
};
let result = InstanceCreatorResult {
kv_backend,
node_manager,
table_id_allocator,
};
Ok((instance, result))
}
pub async fn create_table_metadata_manager(
@@ -608,6 +637,115 @@ impl StartCommand {
}
}
#[async_trait]
pub trait NodeManagerCreator {
async fn create(
&self,
kv_backend: &KvBackendRef,
region_server: RegionServer,
flow_server: FlownodeRef,
) -> Result<NodeManagerRef>;
}
pub struct DefaultNodeManagerCreator;
#[async_trait]
impl NodeManagerCreator for DefaultNodeManagerCreator {
async fn create(
&self,
_: &KvBackendRef,
region_server: RegionServer,
flow_server: FlownodeRef,
) -> Result<NodeManagerRef> {
Ok(Arc::new(StandaloneDatanodeManager {
region_server,
flow_server,
}))
}
}
pub trait TableIdAllocatorCreator {
fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence>;
}
struct DefaultTableIdAllocatorCreator;
impl TableIdAllocatorCreator for DefaultTableIdAllocatorCreator {
fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence> {
Arc::new(
SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
.initial(MIN_USER_TABLE_ID as u64)
.step(10)
.build(),
)
}
}
#[async_trait]
pub trait ProcedureExecutorCreator {
async fn create(
&self,
ddl_manager: DdlManagerRef,
procedure_manager: ProcedureManagerRef,
) -> Result<ProcedureExecutorRef>;
}
pub struct DefaultProcedureExecutorCreator;
#[async_trait]
impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator {
async fn create(
&self,
ddl_manager: DdlManagerRef,
procedure_manager: ProcedureManagerRef,
) -> Result<ProcedureExecutorRef> {
Ok(Arc::new(LocalProcedureExecutor::new(
ddl_manager,
procedure_manager,
)))
}
}
/// `InstanceCreator` is used for grouping various component creators for building the
/// Standalone instance, suitable for customizing how the instance can be built.
pub struct InstanceCreator {
node_manager_creator: Box<dyn NodeManagerCreator>,
table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
}
impl InstanceCreator {
pub fn new(
node_manager_creator: Box<dyn NodeManagerCreator>,
table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
) -> Self {
Self {
node_manager_creator,
table_id_allocator_creator,
procedure_executor_creator,
}
}
}
impl Default for InstanceCreator {
fn default() -> Self {
Self {
node_manager_creator: Box::new(DefaultNodeManagerCreator),
table_id_allocator_creator: Box::new(DefaultTableIdAllocatorCreator),
procedure_executor_creator: Box::new(DefaultProcedureExecutorCreator),
}
}
}
/// `InstanceCreatorResult` is expected to be used paired with [InstanceCreator].
/// It stores the created and other important components for further reusing.
pub struct InstanceCreatorResult {
pub kv_backend: KvBackendRef,
pub node_manager: NodeManagerRef,
pub table_id_allocator: Arc<Sequence>,
}
#[cfg(test)]
mod tests {
use std::default::Default;

View File

@@ -28,7 +28,7 @@ type = 'File'
data_home = '{data_home}'
[meta_client_options]
metasrv_addrs = ['{metasrv_addr}']
metasrv_addrs = ['{addrs.metasrv_addr}']
timeout_millis = 3000
connect_timeout_millis = 5000
tcp_nodelay = false

View File

@@ -1,3 +1,3 @@
[grpc]
bind_addr = "{grpc_addr}"
server_addr = "{grpc_addr}"
bind_addr = "{addrs.grpc_addr}"
server_addr = "{addrs.grpc_addr}"

View File

@@ -26,12 +26,12 @@ type = 'File'
data_home = '{data_home}'
[grpc]
bind_addr = '{grpc_addr}'
bind_addr = '{addrs.grpc_addr}'
runtime_size = 8
[mysql]
enable = true
addr = "{mysql_addr}"
addr = "{addrs.mysql_addr}"
runtime_size = 2
prepared_stmt_cache_size= 10000
@@ -40,7 +40,7 @@ mode = "disable"
[postgres]
enable = true
addr = "{postgres_addr}"
addr = "{addrs.postgres_addr}"
runtime_size = 2
[procedure]

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::sync::{Mutex, OnceLock};
@@ -96,15 +96,7 @@ struct ConfigContext {
use_etcd: bool,
store_addrs: String,
instance_id: usize,
// for following addrs, leave it empty if not needed
// required for datanode
metasrv_addr: String,
// for frontend and standalone
grpc_addr: String,
// for standalone
mysql_addr: String,
// for standalone
postgres_addr: String,
addrs: HashMap<String, String>,
// enable flat format for storage engine
enable_flat_format: bool,
}
@@ -275,40 +267,26 @@ impl ServerMode {
let procedure_dir = data_home.join("procedure").display().to_string();
// Get the required addresses based on server mode
let (metasrv_addr, grpc_addr, mysql_addr, postgres_addr) = match self {
let addrs: HashMap<String, String> = match self {
ServerMode::Standalone {
rpc_bind_addr,
mysql_addr,
postgres_addr,
..
} => (
String::new(),
rpc_bind_addr.clone(),
mysql_addr.clone(),
postgres_addr.clone(),
),
ServerMode::Frontend {
rpc_bind_addr,
mysql_addr,
postgres_addr,
..
} => (
String::new(),
rpc_bind_addr.clone(),
mysql_addr.clone(),
postgres_addr.clone(),
),
ServerMode::Datanode {
rpc_bind_addr,
metasrv_addr,
..
} => (
metasrv_addr.clone(),
rpc_bind_addr.clone(),
String::new(),
String::new(),
),
_ => (String::new(), String::new(), String::new(), String::new()),
http_addr,
} => [
("http_addr".to_string(), http_addr.clone()),
("grpc_addr".to_string(), rpc_bind_addr.clone()),
("mysql_addr".to_string(), mysql_addr.clone()),
("postgres_addr".to_string(), postgres_addr.clone()),
]
.into(),
ServerMode::Frontend { rpc_bind_addr, .. } => {
[("grpc_addr".to_string(), rpc_bind_addr.clone())].into()
}
ServerMode::Datanode { metasrv_addr, .. } => {
[("metasrv_addr".to_string(), metasrv_addr.clone())].into()
}
_ => HashMap::new(),
};
let ctx = ConfigContext {
@@ -326,10 +304,7 @@ impl ServerMode {
.collect::<Vec<_>>()
.join(","),
instance_id: id,
metasrv_addr,
grpc_addr,
mysql_addr,
postgres_addr,
addrs,
enable_flat_format: db_ctx.store_config().enable_flat_format,
};