feat: add some configurable points (#7227)

* feat: enhance extension

* fix: cr

* move information schema table factories trait to standalone

* fix: self cr

* remove extension factory

* refactor

* remove extension filed from greptime options struct

* refactor

* minor refactor

* fix: cargo check

* fix: clippy

* fix: license check

* feat: enhance grpc and http configurator in servers crate

* grpc builder configurator

* remove unused file

* complete the remaining expansion points.

* fix: self-cr

* rename

* fix: typo
This commit is contained in:
fys
2025-11-27 17:21:46 +08:00
committed by GitHub
parent afefc0c604
commit 020477994b
17 changed files with 254 additions and 169 deletions

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[features]
enterprise = []
testing = []
[lints]

View File

@@ -12,13 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
mod builder;
mod client;
mod manager;
mod table_cache;
pub use builder::KvBackendCatalogManagerBuilder;
pub use builder::{
CatalogManagerConfigurator, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
};
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
pub use manager::KvBackendCatalogManager;
pub use table_cache::{TableCache, TableCacheRef, new_table_cache};

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_error::ext::BoxedError;
use common_meta::cache::LayeredCacheRegistryRef;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
@@ -23,24 +25,34 @@ use common_procedure::ProcedureManagerRef;
use moka::sync::Cache;
use partition::manager::PartitionRuleManager;
#[cfg(feature = "enterprise")]
use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::information_schema::{
InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
};
use crate::kvbackend::KvBackendCatalogManager;
use crate::kvbackend::manager::{CATALOG_CACHE_MAX_CAPACITY, SystemCatalog};
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::numbers_table_provider::NumbersTableProvider;
use crate::system_schema::pg_catalog::PGCatalogProvider;
/// The configurator that customizes or enhances the [`KvBackendCatalogManagerBuilder`].
#[async_trait::async_trait]
pub trait CatalogManagerConfigurator<C>: Send + Sync {
async fn configure(
&self,
builder: KvBackendCatalogManagerBuilder,
ctx: C,
) -> std::result::Result<KvBackendCatalogManagerBuilder, BoxedError>;
}
pub type CatalogManagerConfiguratorRef<C> = Arc<dyn CatalogManagerConfigurator<C>>;
pub struct KvBackendCatalogManagerBuilder {
information_extension: InformationExtensionRef,
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
extra_information_table_factories:
std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
extra_information_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
}
impl KvBackendCatalogManagerBuilder {
@@ -55,8 +67,7 @@ impl KvBackendCatalogManagerBuilder {
cache_registry,
procedure_manager: None,
process_manager: None,
#[cfg(feature = "enterprise")]
extra_information_table_factories: std::collections::HashMap::new(),
extra_information_table_factories: HashMap::new(),
}
}
@@ -71,10 +82,9 @@ impl KvBackendCatalogManagerBuilder {
}
/// Sets the extra information tables.
#[cfg(feature = "enterprise")]
pub fn with_extra_information_table_factories(
mut self,
factories: std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
factories: HashMap<String, InformationSchemaTableFactoryRef>,
) -> Self {
self.extra_information_table_factories = factories;
self
@@ -87,7 +97,6 @@ impl KvBackendCatalogManagerBuilder {
cache_registry,
procedure_manager,
process_manager,
#[cfg(feature = "enterprise")]
extra_information_table_factories,
} = self;
Arc::new_cyclic(|me| KvBackendCatalogManager {
@@ -111,7 +120,6 @@ impl KvBackendCatalogManagerBuilder {
process_manager.clone(),
backend.clone(),
);
#[cfg(feature = "enterprise")]
let provider = provider
.with_extra_table_factories(extra_information_table_factories.clone());
Arc::new(provider)
@@ -123,7 +131,6 @@ impl KvBackendCatalogManagerBuilder {
numbers_table_provider: NumbersTableProvider,
backend,
process_manager,
#[cfg(feature = "enterprise")]
extra_information_table_factories,
},
cache_registry,

View File

@@ -53,9 +53,9 @@ use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
};
#[cfg(feature = "enterprise")]
use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::information_schema::{
InformationExtensionRef, InformationSchemaProvider, InformationSchemaTableFactoryRef,
};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::SystemSchemaProvider;
@@ -557,7 +557,6 @@ pub(super) struct SystemCatalog {
pub(super) numbers_table_provider: NumbersTableProvider,
pub(super) backend: KvBackendRef,
pub(super) process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
pub(super) extra_information_table_factories:
std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
}
@@ -628,7 +627,6 @@ impl SystemCatalog {
self.process_manager.clone(),
self.backend.clone(),
);
#[cfg(feature = "enterprise")]
let provider = provider
.with_extra_table_factories(self.extra_information_table_factories.clone());
Arc::new(provider)

View File

@@ -117,7 +117,6 @@ macro_rules! setup_memory_table {
};
}
#[cfg(feature = "enterprise")]
pub struct MakeInformationTableRequest {
pub catalog_name: String,
pub catalog_manager: Weak<dyn CatalogManager>,
@@ -128,12 +127,10 @@ pub struct MakeInformationTableRequest {
///
/// This trait allows for extensibility of the information schema by providing
/// a way to dynamically create custom information schema tables.
#[cfg(feature = "enterprise")]
pub trait InformationSchemaTableFactory {
fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
}
#[cfg(feature = "enterprise")]
pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
/// The `information_schema` tables info provider.
@@ -143,9 +140,7 @@ pub struct InformationSchemaProvider {
process_manager: Option<ProcessManagerRef>,
flow_metadata_manager: Arc<FlowMetadataManager>,
tables: HashMap<String, TableRef>,
#[allow(dead_code)]
kv_backend: KvBackendRef,
#[cfg(feature = "enterprise")]
extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
}
@@ -166,7 +161,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
}
fn system_table(&self, name: &str) -> Option<SystemTableRef> {
#[cfg(feature = "enterprise")]
if let Some(factory) = self.extra_table_factories.get(name) {
let req = MakeInformationTableRequest {
catalog_name: self.catalog_name.clone(),
@@ -281,7 +275,6 @@ impl InformationSchemaProvider {
process_manager,
tables: HashMap::new(),
kv_backend,
#[cfg(feature = "enterprise")]
extra_table_factories: HashMap::new(),
};
@@ -290,7 +283,6 @@ impl InformationSchemaProvider {
provider
}
#[cfg(feature = "enterprise")]
pub(crate) fn with_extra_table_factories(
mut self,
factories: HashMap<String, InformationSchemaTableFactoryRef>,
@@ -358,7 +350,6 @@ impl InformationSchemaProvider {
if let Some(process_list) = self.build_table(PROCESS_LIST) {
tables.insert(PROCESS_LIST.to_string(), process_list);
}
#[cfg(feature = "enterprise")]
for name in self.extra_table_factories.keys() {
tables.insert(name.clone(), self.build_table(name).expect(name));
}

View File

@@ -16,7 +16,7 @@ default = [
"meta-srv/pg_kvbackend",
"meta-srv/mysql_kvbackend",
]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise", "catalog/enterprise"]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"]
tokio-console = ["common-telemetry/tokio-console"]
[lints]

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::CatalogManagerRef;
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use clap::Parser;
@@ -24,12 +26,14 @@ use client::client_manager::NodeClients;
use common_base::Plugins;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::FlownodeId;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_stat::ResourceStatImpl;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
@@ -39,12 +43,13 @@ use flow::{
get_flow_auth_options,
};
use meta_client::{MetaClientOptions, MetaClientType};
use servers::configurator::GrpcBuilderConfiguratorRef;
use snafu::{OptionExt, ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{
BuildCacheRegistrySnafu, InitMetadataSnafu, LoadLayeredConfigSnafu, MetaClientInitSnafu,
MissingConfigSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
MissingConfigSnafu, OtherSnafu, Result, ShutdownFlownodeSnafu, StartFlownodeSnafu,
};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
@@ -55,33 +60,14 @@ type FlownodeOptions = GreptimeOptions<flow::FlownodeOptions>;
pub struct Instance {
flownode: FlownodeInstance,
// The components of flownode, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub catalog_manager: catalog::CatalogManagerRef,
pub fe_client: Arc<FrontendClient>,
pub kv_backend: common_meta::kv_backend::KvBackendRef,
}
impl Instance {
pub fn new(
flownode: FlownodeInstance,
#[cfg(feature = "enterprise")] components: Components,
guard: Vec<WorkerGuard>,
) -> Self {
pub fn new(flownode: FlownodeInstance, guard: Vec<WorkerGuard>) -> Self {
Self {
flownode,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
}
}
@@ -94,11 +80,6 @@ impl Instance {
pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait::async_trait]
@@ -396,7 +377,7 @@ impl StartCommand {
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
opts.clone(),
plugins,
plugins.clone(),
table_metadata_manager,
catalog_manager.clone(),
flow_metadata_manager,
@@ -405,8 +386,29 @@ impl StartCommand {
.with_heartbeat_task(heartbeat_task);
let mut flownode = flownode_builder.build().await.context(StartFlownodeSnafu)?;
let builder =
FlownodeServiceBuilder::grpc_server_builder(&opts, flownode.flownode_server());
let builder = if let Some(configurator) =
plugins.get::<GrpcBuilderConfiguratorRef<GrpcConfigureContext>>()
{
let context = GrpcConfigureContext {
kv_backend: cached_meta_backend.clone(),
fe_client: frontend_client.clone(),
flownode_id: member_id,
catalog_manager: catalog_manager.clone(),
};
configurator
.configure(builder, context)
.await
.context(OtherSnafu)?
} else {
builder
};
let grpc_server = builder.build();
let services = FlownodeServiceBuilder::new(&opts)
.with_default_grpc_server(flownode.flownode_server())
.with_grpc_server(grpc_server)
.enable_http_service()
.build()
.context(StartFlownodeSnafu)?;
@@ -430,16 +432,14 @@ impl StartCommand {
.set_frontend_invoker(invoker)
.await;
#[cfg(feature = "enterprise")]
let components = Components {
catalog_manager: catalog_manager.clone(),
fe_client: frontend_client,
kv_backend: cached_meta_backend,
};
#[cfg(not(feature = "enterprise"))]
return Ok(Instance::new(flownode, guard));
#[cfg(feature = "enterprise")]
Ok(Instance::new(flownode, components, guard))
Ok(Instance::new(flownode, guard))
}
}
/// The context for [`GrpcBuilderConfiguratorRef`] in flownode.
pub struct GrpcConfigureContext {
pub kv_backend: KvBackendRef,
pub fe_client: Arc<FrontendClient>,
pub flownode_id: FlownodeId,
pub catalog_manager: CatalogManagerRef,
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -19,7 +20,10 @@ use std::time::Duration;
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::kvbackend::{
CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
MetaKvBackend,
};
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::client_manager::NodeClients;
@@ -41,14 +45,14 @@ use frontend::frontend::Frontend;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use meta_client::{MetaClientOptions, MetaClientType};
use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
use servers::addrs;
use servers::grpc::GrpcOptions;
use servers::tls::{TlsMode, TlsOption};
use snafu::{OptionExt, ResultExt};
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{self, Result};
use crate::error::{self, OtherSnafu, Result};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
@@ -416,9 +420,16 @@ impl StartCommand {
layered_cache_registry.clone(),
)
.with_process_manager(process_manager.clone());
#[cfg(feature = "enterprise")]
let builder = if let Some(factories) = plugins.get() {
builder.with_extra_information_table_factories(factories)
let builder = if let Some(configurator) =
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
{
let ctx = CatalogManagerConfigureContext {
meta_client: meta_client.clone(),
};
configurator
.configure(builder, ctx)
.await
.context(OtherSnafu)?
} else {
builder
};
@@ -471,6 +482,11 @@ impl StartCommand {
}
}
/// The context for [`CatalogManagerConfigratorRef`] in frontend.
pub struct CatalogManagerConfigureContext {
pub meta_client: MetaClientRef,
}
#[cfg(test)]
mod tests {
use std::io::Write;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::fmt::{self, Debug};
use std::path::Path;
use std::time::Duration;
@@ -23,7 +23,7 @@ use common_config::Configurable;
use common_telemetry::info;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_version::{short_version, verbose_version};
use meta_srv::bootstrap::MetasrvInstance;
use meta_srv::bootstrap::{MetasrvInstance, metasrv_builder};
use meta_srv::metasrv::BackendImpl;
use snafu::ResultExt;
use tracing_appender::non_blocking::WorkerGuard;
@@ -177,7 +177,7 @@ pub struct StartCommand {
backend: Option<BackendImpl>,
}
impl fmt::Debug for StartCommand {
impl Debug for StartCommand {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StartCommand")
.field("rpc_bind_addr", &self.rpc_bind_addr)
@@ -341,7 +341,7 @@ impl StartCommand {
.await
.context(StartMetaServerSnafu)?;
let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins, None)
let builder = metasrv_builder(&opts, plugins, None)
.await
.context(error::BuildMetaServerSnafu)?;
let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
@@ -20,7 +21,7 @@ use std::{fs, path};
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_schema::InformationExtensionRef;
use catalog::kvbackend::KvBackendCatalogManagerBuilder;
use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder};
use catalog::process_manager::ProcessManager;
use clap::Parser;
use common_base::Plugins;
@@ -31,7 +32,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::DdlManager;
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -63,7 +64,7 @@ use standalone::StandaloneInformationExtension;
use standalone::options::StandaloneOptions;
use tracing_appender::non_blocking::WorkerGuard;
use crate::error::{Result, StartFlownodeSnafu};
use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
use crate::options::{GlobalOptions, GreptimeOptions};
use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
@@ -116,34 +117,15 @@ pub struct Instance {
flownode: FlownodeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
// The components of standalone, which make it easier to expand based
// on the components.
#[cfg(feature = "enterprise")]
components: Components,
// Keep the logging guard to prevent the worker from being dropped.
_guard: Vec<WorkerGuard>,
}
#[cfg(feature = "enterprise")]
pub struct Components {
pub plugins: Plugins,
pub kv_backend: KvBackendRef,
pub frontend_client: Arc<FrontendClient>,
pub catalog_manager: catalog::CatalogManagerRef,
}
impl Instance {
/// Find the socket addr of a server by its `name`.
pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
self.frontend.server_handlers().addr(name)
}
#[cfg(feature = "enterprise")]
pub fn components(&self) -> &Components {
&self.components
}
}
#[async_trait]
@@ -415,6 +397,13 @@ impl StartCommand {
plugins.insert::<InformationExtensionRef>(information_extension.clone());
let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
// for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler(opts.query.clone());
let frontend_client = Arc::new(frontend_client);
let builder = KvBackendCatalogManagerBuilder::new(
information_extension.clone(),
kv_backend.clone(),
@@ -422,9 +411,16 @@ impl StartCommand {
)
.with_procedure_manager(procedure_manager.clone())
.with_process_manager(process_manager.clone());
#[cfg(feature = "enterprise")]
let builder = if let Some(factories) = plugins.get() {
builder.with_extra_information_table_factories(factories)
let builder = if let Some(configurator) =
plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
{
let ctx = CatalogManagerConfigureContext {
fe_client: frontend_client.clone(),
};
configurator
.configure(builder, ctx)
.await
.context(OtherSnafu)?
} else {
builder
};
@@ -439,11 +435,6 @@ impl StartCommand {
..Default::default()
};
// for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler(opts.query.clone());
let frontend_client = Arc::new(frontend_client);
let flow_builder = FlownodeBuilder::new(
flownode_options,
plugins.clone(),
@@ -514,11 +505,17 @@ impl StartCommand {
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
.context(error::InitDdlManagerSnafu)?;
#[cfg(feature = "enterprise")]
let ddl_manager = {
let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
plugins.get();
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
let ddl_manager = if let Some(configurator) = plugins.get::<DdlManagerConfiguratorRef>() {
let ctx = DdlManagerConfigureContext {
kv_backend: kv_backend.clone(),
};
configurator
.configure(ddl_manager, ctx)
.await
.context(OtherSnafu)?
} else {
ddl_manager
};
let procedure_executor = Arc::new(LocalProcedureExecutor::new(
@@ -574,22 +571,12 @@ impl StartCommand {
heartbeat_task: None,
};
#[cfg(feature = "enterprise")]
let components = Components {
plugins,
kv_backend,
frontend_client,
catalog_manager,
};
Ok(Instance {
datanode,
frontend,
flownode,
procedure_manager,
wal_options_allocator,
#[cfg(feature = "enterprise")]
components,
_guard: guard,
})
}
@@ -608,6 +595,11 @@ impl StartCommand {
}
}
/// The context for [`CatalogManagerConfigratorRef`] in standalone.
pub struct CatalogManagerConfigureContext {
pub fe_client: Arc<FrontendClient>,
}
#[cfg(test)]
mod tests {
use std::default::Default;

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use common_error::ext::BoxedError;
use common_procedure::{
BoxedProcedureLoader, Output, ProcedureId, ProcedureManagerRef, ProcedureWithId, watcher,
};
@@ -45,6 +46,7 @@ use crate::error::{
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::procedure_executor::ExecutorContext;
#[cfg(feature = "enterprise")]
use crate::rpc::ddl::DdlTask::CreateTrigger;
@@ -66,6 +68,23 @@ use crate::rpc::ddl::{
};
use crate::rpc::router::RegionRoute;
/// A configurator that customizes or enhances a [`DdlManager`].
#[async_trait::async_trait]
pub trait DdlManagerConfigurator: Send + Sync {
/// Configures the given [`DdlManager`] using the provided [`DdlManagerConfigureContext`].
async fn configure(
&self,
ddl_manager: DdlManager,
ctx: DdlManagerConfigureContext,
) -> std::result::Result<DdlManager, BoxedError>;
}
pub type DdlManagerConfiguratorRef = Arc<dyn DdlManagerConfigurator>;
pub struct DdlManagerConfigureContext {
pub kv_backend: KvBackendRef,
}
pub type DdlManagerRef = Arc<DdlManager>;
pub type BoxedProcedureLoaderFactory = dyn Fn(DdlContext) -> BoxedProcedureLoader;
@@ -148,11 +167,8 @@ impl DdlManager {
}
#[cfg(feature = "enterprise")]
pub fn with_trigger_ddl_manager(
mut self,
trigger_ddl_manager: Option<TriggerDdlManagerRef>,
) -> Self {
self.trigger_ddl_manager = trigger_ddl_manager;
pub fn with_trigger_ddl_manager(mut self, trigger_ddl_manager: TriggerDdlManagerRef) -> Self {
self.trigger_ddl_manager = Some(trigger_ddl_manager);
self
}

View File

@@ -32,15 +32,18 @@ use operator::flow::FlowServiceOperator;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use operator::statement::{
ExecutorConfigureContext, StatementExecutor, StatementExecutorConfiguratorRef,
StatementExecutorRef,
};
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
use pipeline::pipeline_operator::PipelineOperator;
use query::QueryEngineFactory;
use query::region_query::RegionQueryHandlerFactoryRef;
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use crate::error::{self, Result};
use crate::error::{self, ExternalSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::frontend::FrontendOptions;
use crate::instance::Instance;
@@ -187,10 +190,15 @@ impl FrontendBuilder {
Some(process_manager.clone()),
);
#[cfg(feature = "enterprise")]
let statement_executor =
if let Some(factory) = plugins.get::<operator::statement::TriggerQuerierFactoryRef>() {
statement_executor.with_trigger_querier(factory.create(kv_backend.clone()))
if let Some(configurator) = plugins.get::<StatementExecutorConfiguratorRef>() {
let ctx = ExecutorConfigureContext {
kv_backend: kv_backend.clone(),
};
configurator
.configure(statement_executor, ctx)
.await
.context(ExternalSnafu)?
} else {
statement_executor
};

View File

@@ -29,7 +29,7 @@ use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_telemetry::info;
use either::Either;
use servers::configurator::ConfiguratorRef;
use servers::configurator::GrpcRouterConfiguratorRef;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
@@ -44,6 +44,7 @@ use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
use crate::election::CANDIDATE_LEASE_SECS;
use crate::election::etcd::EtcdElection;
use crate::error::OtherSnafu;
use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{
BackendImpl, ElectionRef, Metasrv, MetasrvOptions, SelectTarget, SelectorRef,
@@ -131,8 +132,15 @@ impl MetasrvInstance {
// Start gRPC server with admin services for backward compatibility
let mut router = router(self.metasrv.clone());
if let Some(configurator) = self.metasrv.plugins().get::<ConfiguratorRef>() {
router = configurator.config_grpc(router);
if let Some(configurator) = self
.metasrv
.plugins()
.get::<GrpcRouterConfiguratorRef<()>>()
{
router = configurator
.configure_grpc_router(router, ())
.await
.context(OtherSnafu)?;
}
let (serve_state_tx, serve_state_rx) = oneshot::channel();

View File

@@ -28,7 +28,7 @@ use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocato
use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::DdlManager;
use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerConfigureContext};
use common_meta::distributed_time_constants::{self};
use common_meta::key::TableMetadataManager;
use common_meta::key::flow::FlowMetadataManager;
@@ -54,7 +54,7 @@ use store_api::storage::MAX_REGION_SEQ;
use crate::bootstrap::build_default_meta_peer_client;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::MetaPeerClientRef;
use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
use crate::error::{self, BuildWalOptionsAllocatorSnafu, OtherSnafu, Result};
use crate::events::EventHandlerImpl;
use crate::gc::GcScheduler;
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
@@ -402,13 +402,22 @@ impl MetasrvBuilder {
let procedure_manager_c = procedure_manager.clone();
let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
.context(error::InitDdlManagerSnafu)?;
#[cfg(feature = "enterprise")]
let ddl_manager = {
let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
});
ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
let ddl_manager = if let Some(configurator) = plugins
.as_ref()
.and_then(|p| p.get::<DdlManagerConfiguratorRef>())
{
let ctx = DdlManagerConfigureContext {
kv_backend: kv_backend.clone(),
};
configurator
.configure(ddl_manager, ctx)
.await
.context(OtherSnafu)?
} else {
ddl_manager
};
let ddl_manager = Arc::new(ddl_manager);
let region_flush_ticker = if is_remote_wal {

View File

@@ -88,6 +88,22 @@ use crate::insert::InserterRef;
use crate::statement::copy_database::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
use crate::statement::set::set_allow_query_fallback;
/// A configurator that customizes or enhances a [`StatementExecutor`].
#[async_trait::async_trait]
pub trait StatementExecutorConfigurator: Send + Sync {
async fn configure(
&self,
executor: StatementExecutor,
ctx: ExecutorConfigureContext,
) -> std::result::Result<StatementExecutor, BoxedError>;
}
pub type StatementExecutorConfiguratorRef = Arc<dyn StatementExecutorConfigurator>;
pub struct ExecutorConfigureContext {
pub kv_backend: KvBackendRef,
}
#[derive(Clone)]
pub struct StatementExecutor {
catalog_manager: CatalogManagerRef,
@@ -106,15 +122,6 @@ pub struct StatementExecutor {
pub type StatementExecutorRef = Arc<StatementExecutor>;
/// Trait for creating [`TriggerQuerier`] instance.
#[cfg(feature = "enterprise")]
pub trait TriggerQuerierFactory: Send + Sync {
fn create(&self, kv_backend: KvBackendRef) -> TriggerQuerierRef;
}
#[cfg(feature = "enterprise")]
pub type TriggerQuerierFactoryRef = Arc<dyn TriggerQuerierFactory>;
/// Trait for querying trigger info, such as `SHOW CREATE TRIGGER` etc.
#[cfg(feature = "enterprise")]
#[async_trait::async_trait]

View File

@@ -15,16 +15,45 @@
use std::sync::Arc;
use axum::Router as HttpRouter;
use common_error::ext::BoxedError;
use tonic::transport::server::Router as GrpcRouter;
pub trait Configurator: Send + Sync {
fn config_http(&self, route: HttpRouter) -> HttpRouter {
route
}
use crate::grpc::builder::GrpcServerBuilder;
fn config_grpc(&self, route: GrpcRouter) -> GrpcRouter {
route
}
/// A configurator that customizes or enhances an HTTP router.
#[async_trait::async_trait]
pub trait HttpConfigurator<C>: Send + Sync {
/// Configures the given HTTP router using the provided context.
async fn configure_http(
&self,
route: HttpRouter,
ctx: C,
) -> std::result::Result<HttpRouter, BoxedError>;
}
pub type ConfiguratorRef = Arc<dyn Configurator>;
pub type HttpConfiguratorRef<C> = Arc<dyn HttpConfigurator<C>>;
/// A configurator that customizes or enhances a gRPC router.
#[async_trait::async_trait]
pub trait GrpcRouterConfigurator<C>: Send + Sync {
/// Configures the given gRPC router using the provided context.
async fn configure_grpc_router(
&self,
route: GrpcRouter,
ctx: C,
) -> std::result::Result<GrpcRouter, BoxedError>;
}
pub type GrpcRouterConfiguratorRef<C> = Arc<dyn GrpcRouterConfigurator<C>>;
/// A configurator that customizes or enhances a [`GrpcServerBuilder`].
#[async_trait::async_trait]
pub trait GrpcBuilderConfigurator<C>: Send + Sync {
async fn configure(
&self,
builder: GrpcServerBuilder,
ctx: C,
) -> std::result::Result<GrpcServerBuilder, BoxedError>;
}
pub type GrpcBuilderConfiguratorRef<C> = Arc<dyn GrpcBuilderConfigurator<C>>;

View File

@@ -50,10 +50,11 @@ use tower_http::trace::TraceLayer;
use self::authorize::AuthState;
use self::result::table_result::TableResponse;
use crate::configurator::ConfiguratorRef;
use crate::configurator::HttpConfiguratorRef;
use crate::elasticsearch;
use crate::error::{
AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu, Result,
AddressBindSnafu, AlreadyStartedSnafu, Error, InternalIoSnafu, InvalidHeaderValueSnafu,
OtherSnafu, Result,
};
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::otlp::OtlpState;
@@ -1205,8 +1206,11 @@ impl Server for HttpServer {
);
let mut app = self.make_app();
if let Some(configurator) = self.plugins.get::<ConfiguratorRef>() {
app = configurator.config_http(app);
if let Some(configurator) = self.plugins.get::<HttpConfiguratorRef<()>>() {
app = configurator
.configure_http(app, ())
.await
.context(OtherSnafu)?;
}
let app = self.build(app)?;
let listener = tokio::net::TcpListener::bind(listening)