chore: add more fields to DdlManagerConfigureContext (#7310)

* feat: add more context for configurator

* move the flow grpc configure context to plugins crate

* move context to plugins crate

* add more fields

* fix: cargo check

* refactor: some

* refactor some

* adjust context

* fix: cargo check

* fix: ut
This commit is contained in:
fys
2025-12-01 16:03:12 +08:00
committed by GitHub
parent 18875eed4d
commit e107030d85
12 changed files with 101 additions and 42 deletions

2
Cargo.lock generated
View File

@@ -9501,6 +9501,7 @@ name = "plugins"
version = "1.0.0-beta.2"
dependencies = [
"auth",
"catalog",
"clap 4.5.40",
"cli",
"common-base",
@@ -9509,6 +9510,7 @@ dependencies = [
"datanode",
"flow",
"frontend",
"meta-client",
"meta-srv",
"serde",
"snafu 0.8.6",

View File

@@ -18,7 +18,6 @@ use std::sync::Arc;
use std::time::Duration;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::CatalogManagerRef;
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use clap::Parser;
@@ -26,14 +25,12 @@ use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::FlownodeId;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
@@ -43,6 +40,7 @@ use flow::{
get_flow_auth_options,
};
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::flownode::context::GrpcConfigureContext;
use servers::configurator::GrpcBuilderConfiguratorRef;
use snafu::{OptionExt, ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;
@@ -435,11 +433,3 @@ impl StartCommand {
Ok(Instance::new(flownode, guard))
}
}
/// The context for [`GrpcBuilderConfiguratorRef`] in flownode.
pub struct GrpcConfigureContext {
pub kv_backend: KvBackendRef,
pub fe_client: Arc<FrontendClient>,
pub flownode_id: FlownodeId,
pub catalog_manager: CatalogManagerRef,
}

View File

@@ -45,7 +45,10 @@ use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
use meta_client::{MetaClientOptions, MetaClientType};
use plugins::frontend::context::{
CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
};
use servers::addrs;
use servers::grpc::GrpcOptions;
use servers::tls::{TlsMode, TlsOption};
@@ -423,9 +426,11 @@ impl StartCommand {
let builder = if let Some(configurator) =
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
{
let ctx = CatalogManagerConfigureContext {
let ctx = DistributedCatalogManagerConfigureContext {
meta_client: meta_client.clone(),
};
let ctx = CatalogManagerConfigureContext::Distributed(ctx);
configurator
.configure(builder, ctx)
.await
@@ -482,11 +487,6 @@ impl StartCommand {
}
}
/// The context for [`CatalogManagerConfigratorRef`] in frontend.
pub struct CatalogManagerConfigureContext {
pub meta_client: MetaClientRef,
}
#[cfg(test)]
mod tests {
use std::io::Write;

View File

@@ -32,7 +32,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -58,6 +58,10 @@ use frontend::instance::StandaloneDatanodeManager;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
use plugins::frontend::context::{
CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
};
use plugins::standalone::context::DdlManagerConfigureContext;
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
use standalone::StandaloneInformationExtension;
@@ -414,9 +418,10 @@ impl StartCommand {
let builder = if let Some(configurator) =
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
{
let ctx = CatalogManagerConfigureContext {
let ctx = StandaloneCatalogManagerConfigureContext {
fe_client: frontend_client.clone(),
};
let ctx = CatalogManagerConfigureContext::Standalone(ctx);
configurator
.configure(builder, ctx)
.await
@@ -506,9 +511,13 @@ impl StartCommand {
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
.context(error::InitDdlManagerSnafu)?;
let ddl_manager = if let Some(configurator) = plugins.get::<DdlManagerConfiguratorRef>() {
let ddl_manager = if let Some(configurator) =
plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
{
let ctx = DdlManagerConfigureContext {
kv_backend: kv_backend.clone(),
fe_client: frontend_client.clone(),
catalog_manager: catalog_manager.clone(),
};
configurator
.configure(ddl_manager, ctx)
@@ -595,11 +604,6 @@ impl StartCommand {
}
}
/// The context for [`CatalogManagerConfigratorRef`] in standalone.
pub struct CatalogManagerConfigureContext {
pub fe_client: Arc<FrontendClient>,
}
#[cfg(test)]
mod tests {
use std::default::Default;

View File

@@ -32,7 +32,12 @@ impl Plugins {
pub fn insert<T: 'static + Send + Sync>(&self, value: T) {
let last = self.write().insert(value);
assert!(last.is_none(), "each type of plugins must be one and only");
if last.is_some() {
panic!(
"Plugin of type {} already exists",
std::any::type_name::<T>()
);
}
}
pub fn get<T: 'static + Send + Sync + Clone>(&self) -> Option<T> {
@@ -140,7 +145,7 @@ mod tests {
}
#[test]
#[should_panic(expected = "each type of plugins must be one and only")]
#[should_panic(expected = "Plugin of type i32 already exists")]
fn test_plugin_uniqueness() {
let plugins = Plugins::new();
plugins.insert(1i32);

View File

@@ -46,7 +46,6 @@ use crate::error::{
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::procedure_executor::ExecutorContext;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::CreateTrigger;
@@ -70,20 +69,16 @@ use crate::rpc::router::RegionRoute;
/// A configurator that customizes or enhances a [`DdlManager`].
#[async_trait::async_trait]
pub trait DdlManagerConfigurator: Send + Sync {
pub trait DdlManagerConfigurator<C>: Send + Sync {
/// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
async fn configure(
&self,
ddl_manager: DdlManager,
ctx: DdlManagerConfigureContext,
ctx: C,
) -> std::result::Result<DdlManager, BoxedError>;
}
pub type DdlManagerConfiguratorRef = Arc<dyn DdlManagerConfigurator>;
pub struct DdlManagerConfigureContext {
pub kv_backend: KvBackendRef,
}
pub type DdlManagerConfiguratorRef<C> = Arc<dyn DdlManagerConfigurator<C>>;
pub type DdlManagerRef = Arc<DdlManager>;

View File

@@ -28,7 +28,7 @@ use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocato
use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext};
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
use common_meta::distributed_time_constants::{self};
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
@@ -405,10 +405,11 @@ impl MetasrvBuilder {
let ddl_manager = if let Some(configurator) = plugins
.as_ref()
.and_then(|p| p.get::<DdlManagerConfiguratorRef>())
.and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
{
let ctx = DdlManagerConfigureContext {
kv_backend: kv_backend.clone(),
meta_peer_client: meta_peer_client.clone(),
};
configurator
.configure(ddl_manager, ctx)
@@ -637,3 +638,9 @@ impl Default for MetasrvBuilder {
Self::new()
}
}
/// The context for [`DdlManagerConfiguratorRef`].
pub struct DdlManagerConfigureContext {
pub kv_backend: KvBackendRef,
pub meta_peer_client: MetaPeerClientRef,
}

View File

@@ -9,6 +9,7 @@ workspace = true
[dependencies]
auth.workspace = true
catalog.workspace = true
clap.workspace = true
cli.workspace = true
common-base.workspace = true
@@ -17,6 +18,7 @@ common-meta.workspace = true
datanode.workspace = true
flow.workspace = true
frontend.workspace = true
meta-client.workspace = true
meta-srv.workspace = true
serde.workspace = true
snafu.workspace = true

View File

@@ -30,3 +30,20 @@ pub async fn setup_flownode_plugins(
pub async fn start_flownode_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}
pub mod context {
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_meta::FlownodeId;
use common_meta::kv_backend::KvBackendRef;
use flow::FrontendClient;
/// The context for `GrpcBuilderConfiguratorRef` in flownode.
pub struct GrpcConfigureContext {
pub kv_backend: KvBackendRef,
pub fe_client: Arc<FrontendClient>,
pub flownode_id: FlownodeId,
pub catalog_manager: CatalogManagerRef,
}
}

View File

@@ -40,3 +40,25 @@ pub async fn setup_frontend_plugins(
pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}
pub mod context {
use std::sync::Arc;
use flow::FrontendClient;
use meta_client::MetaClientRef;
/// The context for [`catalog::kvbackend::CatalogManagerConfiguratorRef`] in standalone or
/// distributed.
pub enum CatalogManagerConfigureContext {
Distributed(DistributedCatalogManagerConfigureContext),
Standalone(StandaloneCatalogManagerConfigureContext),
}
pub struct DistributedCatalogManagerConfigureContext {
pub meta_client: MetaClientRef,
}
pub struct StandaloneCatalogManagerConfigureContext {
pub fe_client: Arc<FrontendClient>,
}
}

View File

@@ -13,12 +13,12 @@
// limitations under the License.
mod cli;
mod datanode;
mod flownode;
mod frontend;
pub mod datanode;
pub mod flownode;
pub mod frontend;
mod meta_srv;
mod options;
mod standalone;
pub mod standalone;
pub use cli::SubCommand;
pub use datanode::{setup_datanode_plugins, start_datanode_plugins};

View File

@@ -33,3 +33,18 @@ pub async fn setup_standalone_plugins(
pub async fn start_standalone_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}
pub mod context {
use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_meta::kv_backend::KvBackendRef;
use flow::FrontendClient;
/// The context for [`common_meta::ddl_manager::DdlManagerConfiguratorRef`] in standalone.
pub struct DdlManagerConfigureContext {
pub kv_backend: KvBackendRef,
pub fe_client: Arc<FrontendClient>,
pub catalog_manager: CatalogManagerRef,
}
}