feat: information table extension (#6434)

* feat: information table extension

* avoid use std HashMap behind cfg feature
This commit is contained in:
fys
2025-07-04 12:37:36 +08:00
committed by GitHub
parent 9b5baa965c
commit c5360601f5
16 changed files with 276 additions and 95 deletions

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[features]
enterprise = []
testing = []
[lints]

View File

@@ -14,9 +14,11 @@
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
mod builder;
mod client;
mod manager;
mod table_cache;
pub use builder::KvBackendCatalogManagerBuilder;
pub use manager::KvBackendCatalogManager;
pub use table_cache::{new_table_cache, TableCache, TableCacheRef};

View File

@@ -0,0 +1,131 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
use common_meta::cache::LayeredCacheRegistryRef;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use moka::sync::Cache;
use partition::manager::PartitionRuleManager;
#[cfg(feature = "enterprise")]
use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::manager::{SystemCatalog, CATALOG_CACHE_MAX_CAPACITY};
use crate::kvbackend::KvBackendCatalogManager;
use crate::process_manager::ProcessManagerRef;
use crate::system_schema::pg_catalog::PGCatalogProvider;
pub struct KvBackendCatalogManagerBuilder {
information_extension: InformationExtensionRef,
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
extra_information_table_factories:
std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
}
impl KvBackendCatalogManagerBuilder {
pub fn new(
information_extension: InformationExtensionRef,
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
) -> Self {
Self {
information_extension,
backend,
cache_registry,
procedure_manager: None,
process_manager: None,
#[cfg(feature = "enterprise")]
extra_information_table_factories: std::collections::HashMap::new(),
}
}
pub fn with_procedure_manager(mut self, procedure_manager: ProcedureManagerRef) -> Self {
self.procedure_manager = Some(procedure_manager);
self
}
pub fn with_process_manager(mut self, process_manager: ProcessManagerRef) -> Self {
self.process_manager = Some(process_manager);
self
}
/// Sets the extra information tables.
#[cfg(feature = "enterprise")]
pub fn with_extra_information_table_factories(
mut self,
factories: std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
) -> Self {
self.extra_information_table_factories = factories;
self
}
pub fn build(self) -> Arc<KvBackendCatalogManager> {
let Self {
information_extension,
backend,
cache_registry,
procedure_manager,
process_manager,
#[cfg(feature = "enterprise")]
extra_information_table_factories,
} = self;
Arc::new_cyclic(|me| KvBackendCatalogManager {
information_extension,
partition_manager: Arc::new(PartitionRuleManager::new(
backend.clone(),
cache_registry
.get()
.expect("Failed to get table_route_cache"),
)),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
information_schema_provider: {
let provider = InformationSchemaProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
Arc::new(FlowMetadataManager::new(backend.clone())),
process_manager.clone(),
backend.clone(),
);
#[cfg(feature = "enterprise")]
let provider = provider
.with_extra_table_factories(extra_information_table_factories.clone());
Arc::new(provider)
},
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
backend,
process_manager,
#[cfg(feature = "enterprise")]
extra_information_table_factories,
},
cache_registry,
procedure_manager,
})
}
}

View File

@@ -30,13 +30,13 @@ use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::{TableInfoManager, TableInfoValue};
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use partition::manager::PartitionRuleManagerRef;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
@@ -52,6 +52,8 @@ use crate::error::{
CacheNotFoundSnafu, GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu,
ListSchemasSnafu, ListTablesSnafu, Result, TableMetadataManagerSnafu,
};
#[cfg(feature = "enterprise")]
use crate::information_schema::InformationSchemaTableFactoryRef;
use crate::information_schema::{InformationExtensionRef, InformationSchemaProvider};
use crate::kvbackend::TableCacheRef;
use crate::process_manager::ProcessManagerRef;
@@ -67,60 +69,22 @@ use crate::CatalogManager;
#[derive(Clone)]
pub struct KvBackendCatalogManager {
/// Provides the extension methods for the `information_schema` tables
information_extension: InformationExtensionRef,
pub(super) information_extension: InformationExtensionRef,
/// Manages partition rules.
partition_manager: PartitionRuleManagerRef,
pub(super) partition_manager: PartitionRuleManagerRef,
/// Manages table metadata.
table_metadata_manager: TableMetadataManagerRef,
pub(super) table_metadata_manager: TableMetadataManagerRef,
/// A sub-CatalogManager that handles system tables
system_catalog: SystemCatalog,
pub(super) system_catalog: SystemCatalog,
/// Cache registry for all caches.
cache_registry: LayeredCacheRegistryRef,
pub(super) cache_registry: LayeredCacheRegistryRef,
/// Only available in `Standalone` mode.
procedure_manager: Option<ProcedureManagerRef>,
pub(super) procedure_manager: Option<ProcedureManagerRef>,
}
const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
pub(super) const CATALOG_CACHE_MAX_CAPACITY: u64 = 128;
impl KvBackendCatalogManager {
pub fn new(
information_extension: InformationExtensionRef,
backend: KvBackendRef,
cache_registry: LayeredCacheRegistryRef,
procedure_manager: Option<ProcedureManagerRef>,
process_manager: Option<ProcessManagerRef>,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
information_extension,
partition_manager: Arc::new(PartitionRuleManager::new(
backend.clone(),
cache_registry
.get()
.expect("Failed to get table_route_cache"),
)),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend.clone())),
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
pg_catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
information_schema_provider: Arc::new(InformationSchemaProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
Arc::new(FlowMetadataManager::new(backend.clone())),
process_manager.clone(),
)),
pg_catalog_provider: Arc::new(PGCatalogProvider::new(
DEFAULT_CATALOG_NAME.to_string(),
me.clone(),
)),
backend,
process_manager,
},
cache_registry,
procedure_manager,
})
}
pub fn view_info_cache(&self) -> Result<ViewInfoCacheRef> {
self.cache_registry.get().context(CacheNotFoundSnafu {
name: "view_info_cache",
@@ -507,16 +471,19 @@ fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
/// - information_schema.{tables}
/// - pg_catalog.{tables}
#[derive(Clone)]
struct SystemCatalog {
catalog_manager: Weak<KvBackendCatalogManager>,
catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
pub(super) struct SystemCatalog {
pub(super) catalog_manager: Weak<KvBackendCatalogManager>,
pub(super) catalog_cache: Cache<String, Arc<InformationSchemaProvider>>,
pub(super) pg_catalog_cache: Cache<String, Arc<PGCatalogProvider>>,
// system_schema_provider for default catalog
information_schema_provider: Arc<InformationSchemaProvider>,
pg_catalog_provider: Arc<PGCatalogProvider>,
backend: KvBackendRef,
process_manager: Option<ProcessManagerRef>,
pub(super) information_schema_provider: Arc<InformationSchemaProvider>,
pub(super) pg_catalog_provider: Arc<PGCatalogProvider>,
pub(super) backend: KvBackendRef,
pub(super) process_manager: Option<ProcessManagerRef>,
#[cfg(feature = "enterprise")]
pub(super) extra_information_table_factories:
std::collections::HashMap<String, InformationSchemaTableFactoryRef>,
}
impl SystemCatalog {
@@ -580,12 +547,17 @@ impl SystemCatalog {
if schema == INFORMATION_SCHEMA_NAME {
let information_schema_provider =
self.catalog_cache.get_with_by_ref(catalog, move || {
Arc::new(InformationSchemaProvider::new(
let provider = InformationSchemaProvider::new(
catalog.to_string(),
self.catalog_manager.clone(),
Arc::new(FlowMetadataManager::new(self.backend.clone())),
self.process_manager.clone(),
))
self.backend.clone(),
);
#[cfg(feature = "enterprise")]
let provider = provider
.with_extra_table_factories(self.extra_information_table_factories.clone());
Arc::new(provider)
});
information_schema_provider.table(table_name)
} else if schema == PG_CATALOG_NAME && channel == Channel::Postgres {

View File

@@ -352,11 +352,13 @@ impl MemoryCatalogManager {
}
fn create_catalog_entry(self: &Arc<Self>, catalog: String) -> SchemaEntries {
let backend = Arc::new(MemoryKvBackend::new());
let information_schema_provider = InformationSchemaProvider::new(
catalog,
Arc::downgrade(self) as Weak<dyn CatalogManager>,
Arc::new(FlowMetadataManager::new(Arc::new(MemoryKvBackend::new()))),
Arc::new(FlowMetadataManager::new(backend.clone())),
None, // we don't need ProcessManager on regions server.
backend,
);
let information_schema = information_schema_provider.tables().clone();

View File

@@ -15,7 +15,7 @@
pub mod information_schema;
mod memory_table;
pub mod pg_catalog;
mod predicate;
pub mod predicate;
mod utils;
use std::collections::HashMap;
@@ -96,7 +96,7 @@ trait SystemSchemaProviderInner {
}
}
pub(crate) trait SystemTable {
pub trait SystemTable {
fn table_id(&self) -> TableId;
fn table_name(&self) -> &'static str;
@@ -110,7 +110,7 @@ pub(crate) trait SystemTable {
}
}
pub(crate) type SystemTableRef = Arc<dyn SystemTable + Send + Sync>;
pub type SystemTableRef = Arc<dyn SystemTable + Send + Sync>;
struct SystemTableDataSource {
table: SystemTableRef,

View File

@@ -38,6 +38,7 @@ use common_meta::cluster::NodeInfo;
use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
@@ -112,6 +113,25 @@ macro_rules! setup_memory_table {
};
}
#[cfg(feature = "enterprise")]
pub struct MakeInformationTableRequest {
pub catalog_name: String,
pub catalog_manager: Weak<dyn CatalogManager>,
pub kv_backend: KvBackendRef,
}
/// A factory trait for making information schema tables.
///
/// This trait allows for extensibility of the information schema by providing
/// a way to dynamically create custom information schema tables.
#[cfg(feature = "enterprise")]
pub trait InformationSchemaTableFactory {
fn make_information_table(&self, req: MakeInformationTableRequest) -> SystemTableRef;
}
#[cfg(feature = "enterprise")]
pub type InformationSchemaTableFactoryRef = Arc<dyn InformationSchemaTableFactory + Send + Sync>;
/// The `information_schema` tables info provider.
pub struct InformationSchemaProvider {
catalog_name: String,
@@ -119,6 +139,10 @@ pub struct InformationSchemaProvider {
process_manager: Option<ProcessManagerRef>,
flow_metadata_manager: Arc<FlowMetadataManager>,
tables: HashMap<String, TableRef>,
#[allow(dead_code)]
kv_backend: KvBackendRef,
#[cfg(feature = "enterprise")]
extra_table_factories: HashMap<String, InformationSchemaTableFactoryRef>,
}
impl SystemSchemaProvider for InformationSchemaProvider {
@@ -128,6 +152,7 @@ impl SystemSchemaProvider for InformationSchemaProvider {
&self.tables
}
}
impl SystemSchemaProviderInner for InformationSchemaProvider {
fn catalog_name(&self) -> &str {
&self.catalog_name
@@ -215,7 +240,22 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
.process_manager
.as_ref()
.map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
_ => None,
table_name => {
#[cfg(feature = "enterprise")]
return self.extra_table_factories.get(table_name).map(|factory| {
let req = MakeInformationTableRequest {
catalog_name: self.catalog_name.clone(),
catalog_manager: self.catalog_manager.clone(),
kv_backend: self.kv_backend.clone(),
};
factory.make_information_table(req)
});
#[cfg(not(feature = "enterprise"))]
{
let _ = table_name;
None
}
}
}
}
}
@@ -226,6 +266,7 @@ impl InformationSchemaProvider {
catalog_manager: Weak<dyn CatalogManager>,
flow_metadata_manager: Arc<FlowMetadataManager>,
process_manager: Option<ProcessManagerRef>,
kv_backend: KvBackendRef,
) -> Self {
let mut provider = Self {
catalog_name,
@@ -233,6 +274,9 @@ impl InformationSchemaProvider {
flow_metadata_manager,
process_manager,
tables: HashMap::new(),
kv_backend,
#[cfg(feature = "enterprise")]
extra_table_factories: HashMap::new(),
};
provider.build_tables();
@@ -240,6 +284,16 @@ impl InformationSchemaProvider {
provider
}
#[cfg(feature = "enterprise")]
pub(crate) fn with_extra_table_factories(
mut self,
factories: HashMap<String, InformationSchemaTableFactoryRef>,
) -> Self {
self.extra_table_factories = factories;
self.build_tables();
self
}
fn build_tables(&mut self) {
let mut tables = HashMap::new();
@@ -290,16 +344,19 @@ impl InformationSchemaProvider {
if let Some(process_list) = self.build_table(PROCESS_LIST) {
tables.insert(PROCESS_LIST.to_string(), process_list);
}
#[cfg(feature = "enterprise")]
for name in self.extra_table_factories.keys() {
tables.insert(name.to_string(), self.build_table(name).expect(name));
}
// Add memory tables
for name in MEMORY_TABLES.iter() {
tables.insert((*name).to_string(), self.build_table(name).expect(name));
}
self.tables = tables;
}
}
trait InformationTable {
pub trait InformationTable {
fn table_id(&self) -> TableId;
fn table_name(&self) -> &'static str;

View File

@@ -48,3 +48,4 @@ pub const FLOWS: &str = "flows";
pub const PROCEDURE_INFO: &str = "procedure_info";
pub const REGION_STATISTICS: &str = "region_statistics";
pub const PROCESS_LIST: &str = "process_list";
pub const TRIGGER_LIST: &str = "trigger_list";

View File

@@ -207,6 +207,7 @@ mod tests {
use session::context::QueryContext;
use super::*;
use crate::kvbackend::KvBackendCatalogManagerBuilder;
use crate::memory::MemoryCatalogManager;
#[test]
@@ -323,13 +324,13 @@ mod tests {
.build(),
);
let catalog_manager = KvBackendCatalogManager::new(
let catalog_manager = KvBackendCatalogManagerBuilder::new(
Arc::new(NoopInformationExtension),
backend.clone(),
layered_cache_registry,
None,
None,
);
)
.build();
let table_metadata_manager = TableMetadataManager::new(backend);
let mut view_info = common_meta::key::test_utils::new_test_table_info(1024, vec![]);
view_info.table_type = TableType::View;

View File

@@ -16,7 +16,7 @@ default = [
"meta-srv/pg_kvbackend",
"meta-srv/mysql_kvbackend",
]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise"]
enterprise = ["common-meta/enterprise", "frontend/enterprise", "meta-srv/enterprise", "catalog/enterprise"]
tokio-console = ["common-telemetry/tokio-console"]
[lints]

View File

@@ -18,7 +18,7 @@ use std::time::Duration;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use clap::Parser;
use client::client_manager::NodeClients;
use common_base::Plugins;
@@ -342,13 +342,12 @@ impl StartCommand {
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let catalog_manager = KvBackendCatalogManager::new(
let catalog_manager = KvBackendCatalogManagerBuilder::new(
information_extension,
cached_meta_backend.clone(),
layered_cache_registry.clone(),
None,
None,
);
)
.build();
let table_metadata_manager =
Arc::new(TableMetadataManager::new(cached_meta_backend.clone()));

View File

@@ -19,7 +19,7 @@ use std::time::Duration;
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::client_manager::NodeClients;
@@ -350,13 +350,20 @@ impl StartCommand {
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
Some(meta_client.clone()),
));
let catalog_manager = KvBackendCatalogManager::new(
let builder = KvBackendCatalogManagerBuilder::new(
information_extension,
cached_meta_backend.clone(),
layered_cache_registry.clone(),
None,
Some(process_manager.clone()),
);
)
.with_process_manager(process_manager.clone());
#[cfg(feature = "enterprise")]
let builder = if let Some(factories) = plugins.get() {
builder.with_extra_information_table_factories(factories)
} else {
builder
};
let catalog_manager = builder.build();
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),

View File

@@ -20,7 +20,7 @@ use std::{fs, path};
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_schema::InformationExtension;
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::kvbackend::KvBackendCatalogManagerBuilder;
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::api::v1::meta::RegionRole;
@@ -544,13 +544,20 @@ impl StartCommand {
));
let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
let catalog_manager = KvBackendCatalogManager::new(
let builder = KvBackendCatalogManagerBuilder::new(
information_extension.clone(),
kv_backend.clone(),
layered_cache_registry.clone(),
Some(procedure_manager.clone()),
Some(process_manager.clone()),
);
)
.with_procedure_manager(procedure_manager.clone())
.with_process_manager(process_manager.clone());
#[cfg(feature = "enterprise")]
let builder = if let Some(factories) = plugins.get() {
builder.with_extra_information_table_factories(factories)
} else {
builder
};
let catalog_manager = builder.build();
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

View File

@@ -78,7 +78,7 @@ pub const INFORMATION_SCHEMA_ROUTINES_TABLE_ID: u32 = 21;
pub const INFORMATION_SCHEMA_SCHEMA_PRIVILEGES_TABLE_ID: u32 = 22;
/// id for information_schema.TABLE_PRIVILEGES
pub const INFORMATION_SCHEMA_TABLE_PRIVILEGES_TABLE_ID: u32 = 23;
/// id for information_schema.TRIGGERS
/// id for information_schema.TRIGGERS (for mysql)
pub const INFORMATION_SCHEMA_TRIGGERS_TABLE_ID: u32 = 24;
/// id for information_schema.GLOBAL_STATUS
pub const INFORMATION_SCHEMA_GLOBAL_STATUS_TABLE_ID: u32 = 25;
@@ -104,6 +104,8 @@ pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
/// id for information_schema.process_list
pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
/// id for information_schema.trigger_list (for greptimedb trigger)
pub const INFORMATION_SCHEMA_TRIGGER_TABLE_ID: u32 = 37;
// ----- End of information_schema tables -----

View File

@@ -24,7 +24,7 @@ use cache::{
with_default_composite_cache_registry,
};
use catalog::information_extension::DistributedInformationExtension;
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
use catalog::process_manager::ProcessManager;
use client::client_manager::NodeClients;
use client::Client;
@@ -381,13 +381,12 @@ impl GreptimeDbClusterBuilder {
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let catalog_manager = KvBackendCatalogManager::new(
let catalog_manager = KvBackendCatalogManagerBuilder::new(
information_extension,
cached_meta_backend.clone(),
cache_registry.clone(),
None,
None,
);
)
.build();
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),

View File

@@ -19,7 +19,7 @@ use cache::{
with_default_composite_cache_registry,
};
use catalog::information_schema::NoopInformationExtension;
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::kvbackend::KvBackendCatalogManagerBuilder;
use catalog::process_manager::ProcessManager;
use cmd::error::StartFlownodeSnafu;
use cmd::standalone::StandaloneOptions;
@@ -163,13 +163,13 @@ impl GreptimeDbStandaloneBuilder {
.build(),
);
let catalog_manager = KvBackendCatalogManager::new(
let catalog_manager = KvBackendCatalogManagerBuilder::new(
Arc::new(NoopInformationExtension),
kv_backend.clone(),
cache_registry.clone(),
Some(procedure_manager.clone()),
None,
);
)
.with_procedure_manager(procedure_manager.clone())
.build();
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler(opts.query.clone());