From c5360601f588bed48bc374a4f62129fb431bc3dc Mon Sep 17 00:00:00 2001 From: fys <40801205+fengys1996@users.noreply.github.com> Date: Fri, 4 Jul 2025 12:37:36 +0800 Subject: [PATCH] feat: information table extension (#6434) * feat: information table extension * avoid use std HashMap behind cfg feature --- src/catalog/Cargo.toml | 1 + src/catalog/src/kvbackend.rs | 2 + src/catalog/src/kvbackend/builder.rs | 131 ++++++++++++++++++ src/catalog/src/kvbackend/manager.rs | 86 ++++-------- src/catalog/src/memory/manager.rs | 4 +- src/catalog/src/system_schema.rs | 6 +- .../src/system_schema/information_schema.rs | 63 ++++++++- .../information_schema/table_names.rs | 1 + src/catalog/src/table_source.rs | 9 +- src/cmd/Cargo.toml | 2 +- src/cmd/src/flownode.rs | 9 +- src/cmd/src/frontend.rs | 17 ++- src/cmd/src/standalone.rs | 17 ++- src/common/catalog/src/consts.rs | 4 +- tests-integration/src/cluster.rs | 9 +- tests-integration/src/standalone.rs | 10 +- 16 files changed, 276 insertions(+), 95 deletions(-) create mode 100644 src/catalog/src/kvbackend/builder.rs diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index c7e2782c0e..4f749f49b2 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -5,6 +5,7 @@ edition.workspace = true license.workspace = true [features] +enterprise = [] testing = [] [lints] diff --git a/src/catalog/src/kvbackend.rs b/src/catalog/src/kvbackend.rs index b2a79b5e7f..3f61894a30 100644 --- a/src/catalog/src/kvbackend.rs +++ b/src/catalog/src/kvbackend.rs @@ -14,9 +14,11 @@ pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend}; +mod builder; mod client; mod manager; mod table_cache; +pub use builder::KvBackendCatalogManagerBuilder; pub use manager::KvBackendCatalogManager; pub use table_cache::{new_table_cache, TableCache, TableCacheRef}; diff --git a/src/catalog/src/kvbackend/builder.rs b/src/catalog/src/kvbackend/builder.rs new file mode 100644 index 0000000000..2834673baa --- /dev/null +++ b/src/catalog/src/kvbackend/builder.rs @@ -0,0 +1,131 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_catalog::consts::DEFAULT_CATALOG_NAME; +use common_meta::cache::LayeredCacheRegistryRef; +use common_meta::key::flow::FlowMetadataManager; +use common_meta::key::TableMetadataManager; +use common_meta::kv_backend::KvBackendRef; +use common_procedure::ProcedureManagerRef; +use moka::sync::Cache; +use partition::manager::PartitionRuleManager; + +#[cfg(feature = "enterprise")] +use crate::information_schema::InformationSchemaTableFactoryRef; +use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider}; +use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY}; +use crate::kvbackend::KvBackendCatalogManager; +use crate::process_manager::ProcessManagerRef; +use crate::system_schema::pg_catalog::PGCatalogProvider; + +pub struct KvBackendCatalogManagerBuilder { + information_extension: InformationExtensionRef, + backend: KvBackendRef, + cache_registry: LayeredCacheRegistryRef, + procedure_manager: Option, + process_manager: Option, + #[cfg(feature = "enterprise")] + extra_information_table_factories: + std::collections::HashMap, +} + +impl KvBackendCatalogManagerBuilder { + pub fn new( + information_extension: InformationExtensionRef, + backend: KvBackendRef, + cache_registry: LayeredCacheRegistryRef, + ) -> Self { + Self { + information_extension, + backend, + cache_registry, + procedure_manager: None, + process_manager: None, + #[cfg(feature = "enterprise")] + extra_information_table_factories: std::collections::HashMap::new(), + } + } + + pub fn with_procedure_manager(mut self, procedure_manager: ProcedureManagerRef) -> Self { + self.procedure_manager = Some(procedure_manager); + self + } + + pub fn with_process_manager(mut self, process_manager: ProcessManagerRef) -> Self { + self.process_manager = Some(process_manager); + self + } + + /// Sets the extra information tables. + #[cfg(feature = "enterprise")] + pub fn with_extra_information_table_factories( + mut self, + factories: std::collections::HashMap, + ) -> Self { + self.extra_information_table_factories = factories; + self + } + + pub fn build(self) -> Arc { + let Self { + information_extension, + backend, + cache_registry, + procedure_manager, + process_manager, + #[cfg(feature = "enterprise")] + extra_information_table_factories, + } = self; + Arc::new_cyclic(|me| KvBackendCatalogManager { + information_extension, + partition_manager: Arc::new(PartitionRuleManager::new( + backend.clone(), + cache_registry + .get() + .expect("Failed to get table_route_cache"), + )), + table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())), + system_catalog: SystemCatalog { + catalog_manager: me.clone(), + catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), + pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), + information_schema_provider: { + let provider = InformationSchemaProvider::new( + DEFAULT_CATALOG_NAME.to_string(), + me.clone(), + Arc::new(FlowMetadataManager::new(backend.clone())), + process_manager.clone(), + backend.clone(), + ); + #[cfg(feature = "enterprise")] + let provider = provider + .with_extra_table_factories(extra_information_table_factories.clone()); + Arc::new(provider) + }, + pg_catalog_provider: Arc::new(PGCatalogProvider::new( + DEFAULT_CATALOG_NAME.to_string(), + me.clone(), + )), + backend, + process_manager, + #[cfg(feature = "enterprise")] + extra_information_table_factories, + }, + cache_registry, + procedure_manager, + }) + } +} diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 0cfab086d1..58a3ab0adb 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -30,13 +30,13 @@ use common_meta::key::flow::FlowMetadataManager; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::{TableInfoManager, TableInfoValue}; use common_meta::key::table_name::TableNameKey; -use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; +use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureManagerRef; use futures_util::stream::BoxStream; use futures_util::{StreamExt, TryStreamExt}; use moka::sync::Cache; -use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; +use partition::manager::PartitionRuleManagerRef; use session::context::{Channel, QueryContext}; use snafu::prelude::*; use store_api::metric_engine_consts::METRIC_ENGINE_NAME; @@ -52,6 +52,8 @@ use crate::error::{ CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu, }; +#[cfg(feature = "enterprise")] +use crate::information_schema::InformationSchemaTableFactoryRef; use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider}; use crate::kvbackend::TableCacheRef; use crate::process_manager::ProcessManagerRef; @@ -67,60 +69,22 @@ use crate::CatalogManager; #[derive(Clone)] pub struct KvBackendCatalogManager { /// Provides the extension methods for the `information_schema` tables - information_extension: InformationExtensionRef, + pub(super) information_extension: InformationExtensionRef, /// Manages partition rules. - partition_manager: PartitionRuleManagerRef, + pub(super) partition_manager: PartitionRuleManagerRef, /// Manages table metadata. - table_metadata_manager: TableMetadataManagerRef, + pub(super) table_metadata_manager: TableMetadataManagerRef, /// A sub-CatalogManager that handles system tables - system_catalog: SystemCatalog, + pub(super) system_catalog: SystemCatalog, /// Cache registry for all caches. - cache_registry: LayeredCacheRegistryRef, + pub(super) cache_registry: LayeredCacheRegistryRef, /// Only available in `Standalone` mode. - procedure_manager: Option, + pub(super) procedure_manager: Option, } -const CATALOG_CACHE_MAX_CAPACITY: u64 = 128; +pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128; impl KvBackendCatalogManager { - pub fn new( - information_extension: InformationExtensionRef, - backend: KvBackendRef, - cache_registry: LayeredCacheRegistryRef, - procedure_manager: Option, - process_manager: Option, - ) -> Arc { - Arc::new_cyclic(|me| Self { - information_extension, - partition_manager: Arc::new(PartitionRuleManager::new( - backend.clone(), - cache_registry - .get() - .expect("Failed to get table_route_cache"), - )), - table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())), - system_catalog: SystemCatalog { - catalog_manager: me.clone(), - catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), - pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), - information_schema_provider: Arc::new(InformationSchemaProvider::new( - DEFAULT_CATALOG_NAME.to_string(), - me.clone(), - Arc::new(FlowMetadataManager::new(backend.clone())), - process_manager.clone(), - )), - pg_catalog_provider: Arc::new(PGCatalogProvider::new( - DEFAULT_CATALOG_NAME.to_string(), - me.clone(), - )), - backend, - process_manager, - }, - cache_registry, - procedure_manager, - }) - } - pub fn view_info_cache(&self) -> Result { self.cache_registry.get().context(CacheNotFoundSnafu { name: "view_info_cache", @@ -507,16 +471,19 @@ fn build_table(table_info_value: TableInfoValue) -> Result { /// - information_schema.{tables} /// - pg_catalog.{tables} #[derive(Clone)] -struct SystemCatalog { - catalog_manager: Weak, - catalog_cache: Cache>, - pg_catalog_cache: Cache>, +pub(super) struct SystemCatalog { + pub(super) catalog_manager: Weak, + pub(super) catalog_cache: Cache>, + pub(super) pg_catalog_cache: Cache>, // system_schema_provider for default catalog - information_schema_provider: Arc, - pg_catalog_provider: Arc, - backend: KvBackendRef, - process_manager: Option, + pub(super) information_schema_provider: Arc, + pub(super) pg_catalog_provider: Arc, + pub(super) backend: KvBackendRef, + pub(super) process_manager: Option, + #[cfg(feature = "enterprise")] + pub(super) extra_information_table_factories: + std::collections::HashMap, } impl SystemCatalog { @@ -580,12 +547,17 @@ impl SystemCatalog { if schema == INFORMATION_SCHEMA_NAME { let information_schema_provider = self.catalog_cache.get_with_by_ref(catalog, move || { - Arc::new(InformationSchemaProvider::new( + let provider = InformationSchemaProvider::new( catalog.to_string(), self.catalog_manager.clone(), Arc::new(FlowMetadataManager::new(self.backend.clone())), self.process_manager.clone(), - )) + self.backend.clone(), + ); + #[cfg(feature = "enterprise")] + let provider = provider + .with_extra_table_factories(self.extra_information_table_factories.clone()); + Arc::new(provider) }); information_schema_provider.table(table_name) } else if schema == PG_CATALOG_NAME && channel == Channel::Postgres { diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index 9e5d18141d..b58d2b1e5f 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -352,11 +352,13 @@ impl MemoryCatalogManager { } fn create_catalog_entry(self: &Arc, catalog: String) -> SchemaEntries { + let backend = Arc::new(MemoryKvBackend::new()); let information_schema_provider = InformationSchemaProvider::new( catalog, Arc::downgrade(self) as Weak, - Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))), + Arc::new(FlowMetadataManager::new(backend.clone())), None, // we don't need ProcessManager on regions server. + backend, ); let information_schema = information_schema_provider.tables().clone(); diff --git a/src/catalog/src/system_schema.rs b/src/catalog/src/system_schema.rs index 0f4f74b221..15191a818a 100644 --- a/src/catalog/src/system_schema.rs +++ b/src/catalog/src/system_schema.rs @@ -15,7 +15,7 @@ pub mod information_schema; mod memory_table; pub mod pg_catalog; -mod predicate; +pub mod predicate; mod utils; use std::collections::HashMap; @@ -96,7 +96,7 @@ trait SystemSchemaProviderInner { } } -pub(crate) trait SystemTable { +pub trait SystemTable { fn table_id(&self) -> TableId; fn table_name(&self) -> &'static str; @@ -110,7 +110,7 @@ pub(crate) trait SystemTable { } } -pub(crate) type SystemTableRef = Arc; +pub type SystemTableRef = Arc; struct SystemTableDataSource { table: SystemTableRef, diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 28f1120b67..ca534022b0 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -38,6 +38,7 @@ use common_meta::cluster::NodeInfo; use common_meta::datanode::RegionStat; use common_meta::key::flow::flow_state::FlowStat; use common_meta::key::flow::FlowMetadataManager; +use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureInfo; use common_recordbatch::SendableRecordBatchStream; use datatypes::schema::SchemaRef; @@ -112,6 +113,25 @@ macro_rules! setup_memory_table { }; } +#[cfg(feature = "enterprise")] +pub struct MakeInformationTableRequest { + pub catalog_name: String, + pub catalog_manager: Weak, + pub kv_backend: KvBackendRef, +} + +/// A factory trait for making information schema tables. +/// +/// This trait allows for extensibility of the information schema by providing +/// a way to dynamically create custom information schema tables. +#[cfg(feature = "enterprise")] +pub trait InformationSchemaTableFactory { + fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef; +} + +#[cfg(feature = "enterprise")] +pub type InformationSchemaTableFactoryRef = Arc; + /// The `information_schema` tables info provider. pub struct InformationSchemaProvider { catalog_name: String, @@ -119,6 +139,10 @@ pub struct InformationSchemaProvider { process_manager: Option, flow_metadata_manager: Arc, tables: HashMap, + #[allow(dead_code)] + kv_backend: KvBackendRef, + #[cfg(feature = "enterprise")] + extra_table_factories: HashMap, } impl SystemSchemaProvider for InformationSchemaProvider { @@ -128,6 +152,7 @@ impl SystemSchemaProvider for InformationSchemaProvider { &self.tables } } + impl SystemSchemaProviderInner for InformationSchemaProvider { fn catalog_name(&self) -> &str { &self.catalog_name @@ -215,7 +240,22 @@ impl SystemSchemaProviderInner for InformationSchemaProvider { .process_manager .as_ref() .map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _), - _ => None, + table_name => { + #[cfg(feature = "enterprise")] + return self.extra_table_factories.get(table_name).map(|factory| { + let req = MakeInformationTableRequest { + catalog_name: self.catalog_name.clone(), + catalog_manager: self.catalog_manager.clone(), + kv_backend: self.kv_backend.clone(), + }; + factory.make_information_table(req) + }); + #[cfg(not(feature = "enterprise"))] + { + let _ = table_name; + None + } + } } } } @@ -226,6 +266,7 @@ impl InformationSchemaProvider { catalog_manager: Weak, flow_metadata_manager: Arc, process_manager: Option, + kv_backend: KvBackendRef, ) -> Self { let mut provider = Self { catalog_name, @@ -233,6 +274,9 @@ impl InformationSchemaProvider { flow_metadata_manager, process_manager, tables: HashMap::new(), + kv_backend, + #[cfg(feature = "enterprise")] + extra_table_factories: HashMap::new(), }; provider.build_tables(); @@ -240,6 +284,16 @@ impl InformationSchemaProvider { provider } + #[cfg(feature = "enterprise")] + pub(crate) fn with_extra_table_factories( + mut self, + factories: HashMap, + ) -> Self { + self.extra_table_factories = factories; + self.build_tables(); + self + } + fn build_tables(&mut self) { let mut tables = HashMap::new(); @@ -290,16 +344,19 @@ impl InformationSchemaProvider { if let Some(process_list) = self.build_table(PROCESS_LIST) { tables.insert(PROCESS_LIST.to_string(), process_list); } + #[cfg(feature = "enterprise")] + for name in self.extra_table_factories.keys() { + tables.insert(name.to_string(), self.build_table(name).expect(name)); + } // Add memory tables for name in MEMORY_TABLES.iter() { tables.insert((*name).to_string(), self.build_table(name).expect(name)); } - self.tables = tables; } } -trait InformationTable { +pub trait InformationTable { fn table_id(&self) -> TableId; fn table_name(&self) -> &'static str; diff --git a/src/catalog/src/system_schema/information_schema/table_names.rs b/src/catalog/src/system_schema/information_schema/table_names.rs index 95e8fe74f4..809450219c 100644 --- a/src/catalog/src/system_schema/information_schema/table_names.rs +++ b/src/catalog/src/system_schema/information_schema/table_names.rs @@ -48,3 +48,4 @@ pub const FLOWS: &str = "flows"; pub const PROCEDURE_INFO: &str = "procedure_info"; pub const REGION_STATISTICS: &str = "region_statistics"; pub const PROCESS_LIST: &str = "process_list"; +pub const TRIGGER_LIST: &str = "trigger_list"; diff --git a/src/catalog/src/table_source.rs b/src/catalog/src/table_source.rs index 0d72850697..5f803fd079 100644 --- a/src/catalog/src/table_source.rs +++ b/src/catalog/src/table_source.rs @@ -207,6 +207,7 @@ mod tests { use session::context::QueryContext; use super::*; + use crate::kvbackend::KvBackendCatalogManagerBuilder; use crate::memory::MemoryCatalogManager; #[test] @@ -323,13 +324,13 @@ mod tests { .build(), ); - let catalog_manager = KvBackendCatalogManager::new( + let catalog_manager = KvBackendCatalogManagerBuilder::new( Arc::new(NoopInformationExtension), backend.clone(), layered_cache_registry, - None, - None, - ); + ) + .build(); + let table_metadata_manager = TableMetadataManager::new(backend); let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]); view_info.table_type = TableType::View; diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index e5a0f29e22..9815471ad9 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -16,7 +16,7 @@ default = [ "meta-srv/pg_kvbackend", "meta-srv/mysql_kvbackend", ] -enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"] +enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise", "catalog/enterprise"] tokio-console = ["common-telemetry/tokio-console"] [lints] diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index 52a26857cb..bd8e411dd6 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -18,7 +18,7 @@ use std::time::Duration; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::information_extension::DistributedInformationExtension; -use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; +use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; use clap::Parser; use client::client_manager::NodeClients; use common_base::Plugins; @@ -342,13 +342,12 @@ impl StartCommand { let information_extension = Arc::new(DistributedInformationExtension::new(meta_client.clone())); - let catalog_manager = KvBackendCatalogManager::new( + let catalog_manager = KvBackendCatalogManagerBuilder::new( information_extension, cached_meta_backend.clone(), layered_cache_registry.clone(), - None, - None, - ); + ) + .build(); let table_metadata_manager = Arc::new(TableMetadataManager::new(cached_meta_backend.clone())); diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 3953705b38..f4e1a4fe36 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -19,7 +19,7 @@ use std::time::Duration; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::information_extension::DistributedInformationExtension; -use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; +use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; use catalog::process_manager::ProcessManager; use clap::Parser; use client::client_manager::NodeClients; @@ -350,13 +350,20 @@ impl StartCommand { addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), Some(meta_client.clone()), )); - let catalog_manager = KvBackendCatalogManager::new( + + let builder = KvBackendCatalogManagerBuilder::new( information_extension, cached_meta_backend.clone(), layered_cache_registry.clone(), - None, - Some(process_manager.clone()), - ); + ) + .with_process_manager(process_manager.clone()); + #[cfg(feature = "enterprise")] + let builder = if let Some(factories) = plugins.get() { + builder.with_extra_information_table_factories(factories) + } else { + builder + }; + let catalog_manager = builder.build(); let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b211ad83b9..82b8d1c51e 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -20,7 +20,7 @@ use std::{fs, path}; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; use catalog::information_schema::InformationExtension; -use catalog::kvbackend::KvBackendCatalogManager; +use catalog::kvbackend::KvBackendCatalogManagerBuilder; use catalog::process_manager::ProcessManager; use clap::Parser; use client::api::v1::meta::RegionRole; @@ -544,13 +544,20 @@ impl StartCommand { )); let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None)); - let catalog_manager = KvBackendCatalogManager::new( + let builder = KvBackendCatalogManagerBuilder::new( information_extension.clone(), kv_backend.clone(), layered_cache_registry.clone(), - Some(procedure_manager.clone()), - Some(process_manager.clone()), - ); + ) + .with_procedure_manager(procedure_manager.clone()) + .with_process_manager(process_manager.clone()); + #[cfg(feature = "enterprise")] + let builder = if let Some(factories) = plugins.get() { + builder.with_extra_information_table_factories(factories) + } else { + builder + }; + let catalog_manager = builder.build(); let table_metadata_manager = Self::create_table_metadata_manager(kv_backend.clone()).await?; diff --git a/src/common/catalog/src/consts.rs b/src/common/catalog/src/consts.rs index 90bc347c1a..ad7c5c2a45 100644 --- a/src/common/catalog/src/consts.rs +++ b/src/common/catalog/src/consts.rs @@ -78,7 +78,7 @@ pub const INFORMATION_SCHEMA_ROUTINES_TABLE_ID: u32 = 21; pub const INFORMATION_SCHEMA_SCHEMA_PRIVILEGES_TABLE_ID: u32 = 22; /// id for information_schema.TABLE_PRIVILEGES pub const INFORMATION_SCHEMA_TABLE_PRIVILEGES_TABLE_ID: u32 = 23; -/// id for information_schema.TRIGGERS +/// id for information_schema.TRIGGERS (for mysql) pub const INFORMATION_SCHEMA_TRIGGERS_TABLE_ID: u32 = 24; /// id for information_schema.GLOBAL_STATUS pub const INFORMATION_SCHEMA_GLOBAL_STATUS_TABLE_ID: u32 = 25; @@ -104,6 +104,8 @@ pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34; pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35; /// id for information_schema.process_list pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36; +/// id for information_schema.trigger_list (for greptimedb trigger) +pub const INFORMATION_SCHEMA_TRIGGER_TABLE_ID: u32 = 37; // ----- End of information_schema tables ----- diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index c76b31f794..2b82952f18 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -24,7 +24,7 @@ use cache::{ with_default_composite_cache_registry, }; use catalog::information_extension::DistributedInformationExtension; -use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; +use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend}; use catalog::process_manager::ProcessManager; use client::client_manager::NodeClients; use client::Client; @@ -381,13 +381,12 @@ impl GreptimeDbClusterBuilder { let information_extension = Arc::new(DistributedInformationExtension::new(meta_client.clone())); - let catalog_manager = KvBackendCatalogManager::new( + let catalog_manager = KvBackendCatalogManagerBuilder::new( information_extension, cached_meta_backend.clone(), cache_registry.clone(), - None, - None, - ); + ) + .build(); let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 4b15d13530..76c8d2d2a6 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -19,7 +19,7 @@ use cache::{ with_default_composite_cache_registry, }; use catalog::information_schema::NoopInformationExtension; -use catalog::kvbackend::KvBackendCatalogManager; +use catalog::kvbackend::KvBackendCatalogManagerBuilder; use catalog::process_manager::ProcessManager; use cmd::error::StartFlownodeSnafu; use cmd::standalone::StandaloneOptions; @@ -163,13 +163,13 @@ impl GreptimeDbStandaloneBuilder { .build(), ); - let catalog_manager = KvBackendCatalogManager::new( + let catalog_manager = KvBackendCatalogManagerBuilder::new( Arc::new(NoopInformationExtension), kv_backend.clone(), cache_registry.clone(), - Some(procedure_manager.clone()), - None, - ); + ) + .with_procedure_manager(procedure_manager.clone()) + .build(); let (frontend_client, frontend_instance_handler) = FrontendClient::from_empty_grpc_handler(opts.query.clone());