mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
refactor: cache invalidator (#3611)
* chore: remove some alias * refactor: cache invalidator
This commit is contained in:
@@ -23,8 +23,7 @@ use common_catalog::consts::{
|
||||
};
|
||||
use common_catalog::format_full_table_name;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
|
||||
use common_meta::error::Result as MetaResult;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, Context, MultiCacheInvalidator};
|
||||
use common_meta::instruction::CacheIdent;
|
||||
use common_meta::key::catalog_name::CatalogNameKey;
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
@@ -44,8 +43,8 @@ use table::TableRef;
|
||||
|
||||
use crate::error::Error::{GetTableCache, TableCacheNotGet};
|
||||
use crate::error::{
|
||||
self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu,
|
||||
Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu,
|
||||
InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result,
|
||||
TableCacheNotGetSnafu, TableMetadataManagerSnafu,
|
||||
};
|
||||
use crate::information_schema::InformationSchemaProvider;
|
||||
use crate::CatalogManager;
|
||||
@@ -57,10 +56,6 @@ use crate::CatalogManager;
|
||||
/// comes from `SystemCatalog`, which is static and read-only.
|
||||
#[derive(Clone)]
|
||||
pub struct KvBackendCatalogManager {
|
||||
// TODO(LFC): Maybe use a real implementation for Standalone mode.
|
||||
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
|
||||
// is implemented by RaftEngine. Maybe we need a cache for it?
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
/// A sub-CatalogManager that handles system tables
|
||||
@@ -68,18 +63,24 @@ pub struct KvBackendCatalogManager {
|
||||
table_cache: AsyncCache<String, TableRef>,
|
||||
}
|
||||
|
||||
fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
|
||||
let table_info = table_info_value
|
||||
.table_info
|
||||
.try_into()
|
||||
.context(catalog_err::InvalidTableInfoInCatalogSnafu)?;
|
||||
Ok(DistTable::table(Arc::new(table_info)))
|
||||
struct TableCacheInvalidator {
|
||||
table_cache: AsyncCache<String, TableRef>,
|
||||
}
|
||||
|
||||
impl TableCacheInvalidator {
|
||||
pub fn new(table_cache: AsyncCache<String, TableRef>) -> Self {
|
||||
Self { table_cache }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for KvBackendCatalogManager {
|
||||
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> MetaResult<()> {
|
||||
for cache in &caches {
|
||||
impl CacheInvalidator for TableCacheInvalidator {
|
||||
async fn invalidate(
|
||||
&self,
|
||||
_ctx: &Context,
|
||||
caches: Vec<CacheIdent>,
|
||||
) -> common_meta::error::Result<()> {
|
||||
for cache in caches {
|
||||
if let CacheIdent::TableName(table_name) = cache {
|
||||
let table_cache_key = format_full_table_name(
|
||||
&table_name.catalog_name,
|
||||
@@ -89,7 +90,7 @@ impl CacheInvalidator for KvBackendCatalogManager {
|
||||
self.table_cache.invalidate(&table_cache_key).await;
|
||||
}
|
||||
}
|
||||
self.cache_invalidator.invalidate(ctx, caches).await
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,11 +100,21 @@ const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
|
||||
const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
impl KvBackendCatalogManager {
|
||||
pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc<Self> {
|
||||
pub async fn new(
|
||||
backend: KvBackendRef,
|
||||
multi_cache_invalidator: Arc<MultiCacheInvalidator>,
|
||||
) -> Arc<Self> {
|
||||
let table_cache: AsyncCache<String, TableRef> = CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(TABLE_CACHE_TTL)
|
||||
.time_to_idle(TABLE_CACHE_TTI)
|
||||
.build();
|
||||
multi_cache_invalidator
|
||||
.add_invalidator(Arc::new(TableCacheInvalidator::new(table_cache.clone())))
|
||||
.await;
|
||||
|
||||
Arc::new_cyclic(|me| Self {
|
||||
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
|
||||
cache_invalidator,
|
||||
system_catalog: SystemCatalog {
|
||||
catalog_manager: me.clone(),
|
||||
catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY),
|
||||
@@ -112,10 +123,7 @@ impl KvBackendCatalogManager {
|
||||
me.clone(),
|
||||
)),
|
||||
},
|
||||
table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY)
|
||||
.time_to_live(TABLE_CACHE_TTL)
|
||||
.time_to_idle(TABLE_CACHE_TTI)
|
||||
.build(),
|
||||
table_cache,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -134,7 +142,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
self
|
||||
}
|
||||
|
||||
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
|
||||
async fn catalog_names(&self) -> Result<Vec<String>> {
|
||||
let stream = self
|
||||
.table_metadata_manager
|
||||
.catalog_manager()
|
||||
@@ -149,7 +157,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
Ok(keys)
|
||||
}
|
||||
|
||||
async fn schema_names(&self, catalog: &str) -> CatalogResult<Vec<String>> {
|
||||
async fn schema_names(&self, catalog: &str) -> Result<Vec<String>> {
|
||||
let stream = self
|
||||
.table_metadata_manager
|
||||
.schema_manager()
|
||||
@@ -165,7 +173,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
Ok(keys.into_iter().collect())
|
||||
}
|
||||
|
||||
async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult<Vec<String>> {
|
||||
async fn table_names(&self, catalog: &str, schema: &str) -> Result<Vec<String>> {
|
||||
let stream = self
|
||||
.table_metadata_manager
|
||||
.table_name_manager()
|
||||
@@ -183,7 +191,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
Ok(tables.into_iter().collect())
|
||||
}
|
||||
|
||||
async fn catalog_exists(&self, catalog: &str) -> CatalogResult<bool> {
|
||||
async fn catalog_exists(&self, catalog: &str) -> Result<bool> {
|
||||
self.table_metadata_manager
|
||||
.catalog_manager()
|
||||
.exists(CatalogNameKey::new(catalog))
|
||||
@@ -191,7 +199,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
.context(TableMetadataManagerSnafu)
|
||||
}
|
||||
|
||||
async fn schema_exists(&self, catalog: &str, schema: &str) -> CatalogResult<bool> {
|
||||
async fn schema_exists(&self, catalog: &str, schema: &str) -> Result<bool> {
|
||||
if self.system_catalog.schema_exist(schema) {
|
||||
return Ok(true);
|
||||
}
|
||||
@@ -203,7 +211,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
.context(TableMetadataManagerSnafu)
|
||||
}
|
||||
|
||||
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult<bool> {
|
||||
async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
|
||||
if self.system_catalog.table_exist(schema, table) {
|
||||
return Ok(true);
|
||||
}
|
||||
@@ -222,7 +230,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
table_name: &str,
|
||||
) -> CatalogResult<Option<TableRef>> {
|
||||
) -> Result<Option<TableRef>> {
|
||||
if let Some(table) = self.system_catalog.table(catalog, schema, table_name) {
|
||||
return Ok(Some(table));
|
||||
}
|
||||
@@ -256,7 +264,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
make_table(table_info_value)
|
||||
build_table(table_info_value)
|
||||
};
|
||||
|
||||
match self
|
||||
@@ -279,7 +287,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
&'a self,
|
||||
catalog: &'a str,
|
||||
schema: &'a str,
|
||||
) -> BoxStream<'a, CatalogResult<TableRef>> {
|
||||
) -> BoxStream<'a, Result<TableRef>> {
|
||||
let sys_tables = try_stream!({
|
||||
// System tables
|
||||
let sys_table_names = self.system_catalog.table_names(schema);
|
||||
@@ -303,7 +311,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
while let Some(table_ids) = table_id_chunks.next().await {
|
||||
let table_ids = table_ids
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.collect::<std::result::Result<Vec<_>, _>>()
|
||||
.map_err(BoxedError::new)
|
||||
.context(ListTablesSnafu { catalog, schema })?;
|
||||
|
||||
@@ -315,7 +323,7 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
.context(TableMetadataManagerSnafu)?;
|
||||
|
||||
for table_info_value in table_info_values.into_values() {
|
||||
yield make_table(table_info_value)?;
|
||||
yield build_table(table_info_value)?;
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -324,6 +332,14 @@ impl CatalogManager for KvBackendCatalogManager {
|
||||
}
|
||||
}
|
||||
|
||||
fn build_table(table_info_value: TableInfoValue) -> Result<TableRef> {
|
||||
let table_info = table_info_value
|
||||
.table_info
|
||||
.try_into()
|
||||
.context(InvalidTableInfoInCatalogSnafu)?;
|
||||
Ok(DistTable::table(Arc::new(table_info)))
|
||||
}
|
||||
|
||||
// TODO: This struct can hold a static map of all system tables when
|
||||
// the upper layer (e.g., procedure) can inform the catalog manager
|
||||
// a new catalog is created.
|
||||
|
||||
@@ -22,6 +22,7 @@ use catalog::kvbackend::{
|
||||
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_base::Plugins;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_meta::cache_invalidator::MultiCacheInvalidator;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_telemetry::logging;
|
||||
@@ -252,9 +253,11 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
|
||||
|
||||
let cached_meta_backend =
|
||||
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
|
||||
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
|
||||
cached_meta_backend.clone(),
|
||||
]));
|
||||
let catalog_list =
|
||||
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend);
|
||||
KvBackendCatalogManager::new(cached_meta_backend.clone(), multi_cache_invalidator).await;
|
||||
let plugins: Plugins = Default::default();
|
||||
let state = Arc::new(QueryEngineState::new(
|
||||
catalog_list,
|
||||
|
||||
@@ -19,6 +19,7 @@ use async_trait::async_trait;
|
||||
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager};
|
||||
use clap::Parser;
|
||||
use client::client_manager::DatanodeClients;
|
||||
use common_meta::cache_invalidator::MultiCacheInvalidator;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_telemetry::logging;
|
||||
@@ -247,13 +248,20 @@ impl StartCommand {
|
||||
.cache_tti(cache_tti)
|
||||
.build();
|
||||
let cached_meta_backend = Arc::new(cached_meta_backend);
|
||||
|
||||
let catalog_manager =
|
||||
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone());
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
|
||||
cached_meta_backend.clone(),
|
||||
]));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
cached_meta_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let executor = HandlerGroupExecutor::new(vec![
|
||||
Arc::new(ParseMailboxMessageHandler),
|
||||
Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())),
|
||||
Arc::new(InvalidateTableCacheHandler::new(
|
||||
multi_cache_invalidator.clone(),
|
||||
)),
|
||||
]);
|
||||
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
@@ -264,12 +272,12 @@ impl StartCommand {
|
||||
|
||||
let mut instance = FrontendBuilder::new(
|
||||
cached_meta_backend.clone(),
|
||||
catalog_manager.clone(),
|
||||
catalog_manager,
|
||||
Arc::new(DatanodeClients::default()),
|
||||
meta_client,
|
||||
)
|
||||
.with_cache_invalidator(catalog_manager.clone())
|
||||
.with_plugin(plugins.clone())
|
||||
.with_cache_invalidator(multi_cache_invalidator)
|
||||
.with_heartbeat_task(heartbeat_task)
|
||||
.try_build()
|
||||
.await
|
||||
|
||||
@@ -20,7 +20,7 @@ use catalog::kvbackend::KvBackendCatalogManager;
|
||||
use clap::Parser;
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_config::{metadata_store_dir, KvBackendConfig};
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator};
|
||||
use common_meta::datanode_manager::DatanodeManagerRef;
|
||||
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
|
||||
use common_meta::ddl::ProcedureExecutorRef;
|
||||
@@ -400,8 +400,9 @@ impl StartCommand {
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
|
||||
let catalog_manager =
|
||||
KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator));
|
||||
KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await;
|
||||
|
||||
let builder =
|
||||
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
|
||||
@@ -432,19 +433,18 @@ impl StartCommand {
|
||||
table_metadata_manager,
|
||||
procedure_manager.clone(),
|
||||
datanode_manager.clone(),
|
||||
catalog_manager.clone(),
|
||||
multi_cache_invalidator,
|
||||
table_meta_allocator,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut frontend = FrontendBuilder::new(
|
||||
kv_backend,
|
||||
catalog_manager.clone(),
|
||||
catalog_manager,
|
||||
datanode_manager,
|
||||
ddl_task_executor,
|
||||
)
|
||||
.with_plugin(fe_plugins.clone())
|
||||
.with_cache_invalidator(catalog_manager)
|
||||
.try_build()
|
||||
.await
|
||||
.context(StartFrontendSnafu)?;
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::instruction::CacheIdent;
|
||||
use crate::key::table_info::TableInfoKey;
|
||||
@@ -58,6 +60,34 @@ impl CacheInvalidator for DummyCacheInvalidator {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MultiCacheInvalidator {
|
||||
invalidators: RwLock<Vec<CacheInvalidatorRef>>,
|
||||
}
|
||||
|
||||
impl MultiCacheInvalidator {
|
||||
pub fn with_invalidators(invalidators: Vec<CacheInvalidatorRef>) -> Self {
|
||||
Self {
|
||||
invalidators: RwLock::new(invalidators),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_invalidator(&self, invalidator: CacheInvalidatorRef) {
|
||||
self.invalidators.write().await.push(invalidator);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for MultiCacheInvalidator {
|
||||
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> Result<()> {
|
||||
let invalidators = self.invalidators.read().await;
|
||||
for invalidator in invalidators.iter() {
|
||||
invalidator.invalidate(ctx, caches.clone()).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<T> CacheInvalidator for T
|
||||
where
|
||||
|
||||
@@ -331,23 +331,19 @@ impl AlterTableProcedure {
|
||||
async fn on_broadcast(&mut self) -> Result<Status> {
|
||||
let alter_kind = self.alter_kind()?;
|
||||
let cache_invalidator = &self.context.cache_invalidator;
|
||||
|
||||
if matches!(alter_kind, Kind::RenameTable { .. }) {
|
||||
cache_invalidator
|
||||
.invalidate(
|
||||
&Context::default(),
|
||||
vec![CacheIdent::TableName(self.data.table_ref().into())],
|
||||
)
|
||||
.await?;
|
||||
let cache_keys = if matches!(alter_kind, Kind::RenameTable { .. }) {
|
||||
vec![CacheIdent::TableName(self.data.table_ref().into())]
|
||||
} else {
|
||||
cache_invalidator
|
||||
.invalidate(
|
||||
&Context::default(),
|
||||
vec![CacheIdent::TableId(self.data.table_id())],
|
||||
)
|
||||
.await?;
|
||||
vec![
|
||||
CacheIdent::TableId(self.data.table_id()),
|
||||
CacheIdent::TableName(self.data.table_ref().into()),
|
||||
]
|
||||
};
|
||||
|
||||
cache_invalidator
|
||||
.invalidate(&Context::default(), cache_keys)
|
||||
.await?;
|
||||
|
||||
Ok(Status::done())
|
||||
}
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ use client::client_manager::DatanodeClients;
|
||||
use client::Client;
|
||||
use common_base::Plugins;
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::cache_invalidator::MultiCacheInvalidator;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
use common_meta::heartbeat::handler::HandlerGroupExecutor;
|
||||
use common_meta::kv_backend::chroot::ChrootKvBackend;
|
||||
@@ -352,13 +353,20 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
let cached_meta_backend =
|
||||
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
|
||||
|
||||
let catalog_manager =
|
||||
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone());
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![
|
||||
cached_meta_backend.clone(),
|
||||
]));
|
||||
let catalog_manager = KvBackendCatalogManager::new(
|
||||
cached_meta_backend.clone(),
|
||||
multi_cache_invalidator.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
let handlers_executor = HandlerGroupExecutor::new(vec![
|
||||
Arc::new(ParseMailboxMessageHandler),
|
||||
Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())),
|
||||
Arc::new(InvalidateTableCacheHandler::new(
|
||||
multi_cache_invalidator.clone(),
|
||||
)),
|
||||
]);
|
||||
|
||||
let heartbeat_task = HeartbeatTask::new(
|
||||
@@ -369,11 +377,11 @@ impl GreptimeDbClusterBuilder {
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
cached_meta_backend.clone(),
|
||||
catalog_manager.clone(),
|
||||
catalog_manager,
|
||||
datanode_clients,
|
||||
meta_client,
|
||||
)
|
||||
.with_cache_invalidator(catalog_manager)
|
||||
.with_cache_invalidator(multi_cache_invalidator)
|
||||
.with_heartbeat_task(heartbeat_task)
|
||||
.try_build()
|
||||
.await
|
||||
|
||||
@@ -19,7 +19,7 @@ use cmd::options::MixOptions;
|
||||
use common_base::Plugins;
|
||||
use common_catalog::consts::MIN_USER_TABLE_ID;
|
||||
use common_config::KvBackendConfig;
|
||||
use common_meta::cache_invalidator::DummyCacheInvalidator;
|
||||
use common_meta::cache_invalidator::MultiCacheInvalidator;
|
||||
use common_meta::ddl::table_meta::TableMetadataAllocator;
|
||||
use common_meta::ddl_manager::DdlManager;
|
||||
use common_meta::key::TableMetadataManager;
|
||||
@@ -126,8 +126,9 @@ impl GreptimeDbStandaloneBuilder {
|
||||
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
|
||||
table_metadata_manager.init().await.unwrap();
|
||||
|
||||
let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default());
|
||||
let catalog_manager =
|
||||
KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator));
|
||||
KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await;
|
||||
|
||||
let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server()));
|
||||
|
||||
@@ -150,7 +151,7 @@ impl GreptimeDbStandaloneBuilder {
|
||||
DdlManager::try_new(
|
||||
procedure_manager.clone(),
|
||||
datanode_manager.clone(),
|
||||
Arc::new(DummyCacheInvalidator),
|
||||
multi_cache_invalidator,
|
||||
table_metadata_manager,
|
||||
table_meta_allocator,
|
||||
Arc::new(MemoryRegionKeeper::default()),
|
||||
@@ -161,12 +162,11 @@ impl GreptimeDbStandaloneBuilder {
|
||||
|
||||
let instance = FrontendBuilder::new(
|
||||
kv_backend.clone(),
|
||||
catalog_manager.clone(),
|
||||
catalog_manager,
|
||||
datanode_manager,
|
||||
ddl_task_executor,
|
||||
)
|
||||
.with_plugin(plugins)
|
||||
.with_cache_invalidator(catalog_manager)
|
||||
.try_build()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user