From f49cd0ca181d1267e1a1a7598529139f34be2137 Mon Sep 17 00:00:00 2001 From: JeremyHi Date: Fri, 29 Mar 2024 15:33:51 +0800 Subject: [PATCH] refactor: cache invalidator (#3611) * chore: remove some alias * refactor: cache invalidator --- src/catalog/src/kvbackend/manager.rs | 86 ++++++++++++++---------- src/cmd/src/cli/repl.rs | 7 +- src/cmd/src/frontend.rs | 20 ++++-- src/cmd/src/standalone.rs | 10 +-- src/common/meta/src/cache_invalidator.rs | 30 +++++++++ src/common/meta/src/ddl/alter_table.rs | 24 +++---- tests-integration/src/cluster.rs | 20 ++++-- tests-integration/src/standalone.rs | 10 +-- 8 files changed, 134 insertions(+), 73 deletions(-) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 327a081809..f08362fe70 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -23,8 +23,7 @@ use common_catalog::consts::{ }; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; -use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context}; -use common_meta::error::Result as MetaResult; +use common_meta::cache_invalidator::{CacheInvalidator, Context, MultiCacheInvalidator}; use common_meta::instruction::CacheIdent; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; @@ -44,8 +43,8 @@ use table::TableRef; use crate::error::Error::{GetTableCache, TableCacheNotGet}; use crate::error::{ - self as catalog_err, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, - Result as CatalogResult, TableCacheNotGetSnafu, TableMetadataManagerSnafu, + InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result, + TableCacheNotGetSnafu, TableMetadataManagerSnafu, }; use crate::information_schema::InformationSchemaProvider; use crate::CatalogManager; @@ -57,10 +56,6 @@ use crate::CatalogManager; /// comes from `SystemCatalog`, which is static and read-only. #[derive(Clone)] pub struct KvBackendCatalogManager { - // TODO(LFC): Maybe use a real implementation for Standalone mode. - // Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend - // is implemented by RaftEngine. Maybe we need a cache for it? - cache_invalidator: CacheInvalidatorRef, partition_manager: PartitionRuleManagerRef, table_metadata_manager: TableMetadataManagerRef, /// A sub-CatalogManager that handles system tables @@ -68,18 +63,24 @@ pub struct KvBackendCatalogManager { table_cache: AsyncCache, } -fn make_table(table_info_value: TableInfoValue) -> CatalogResult { - let table_info = table_info_value - .table_info - .try_into() - .context(catalog_err::InvalidTableInfoInCatalogSnafu)?; - Ok(DistTable::table(Arc::new(table_info))) +struct TableCacheInvalidator { + table_cache: AsyncCache, +} + +impl TableCacheInvalidator { + pub fn new(table_cache: AsyncCache) -> Self { + Self { table_cache } + } } #[async_trait::async_trait] -impl CacheInvalidator for KvBackendCatalogManager { - async fn invalidate(&self, ctx: &Context, caches: Vec) -> MetaResult<()> { - for cache in &caches { +impl CacheInvalidator for TableCacheInvalidator { + async fn invalidate( + &self, + _ctx: &Context, + caches: Vec, + ) -> common_meta::error::Result<()> { + for cache in caches { if let CacheIdent::TableName(table_name) = cache { let table_cache_key = format_full_table_name( &table_name.catalog_name, @@ -89,7 +90,7 @@ impl CacheInvalidator for KvBackendCatalogManager { self.table_cache.invalidate(&table_cache_key).await; } } - self.cache_invalidator.invalidate(ctx, caches).await + Ok(()) } } @@ -99,11 +100,21 @@ const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60); const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60); impl KvBackendCatalogManager { - pub fn new(backend: KvBackendRef, cache_invalidator: CacheInvalidatorRef) -> Arc { + pub async fn new( + backend: KvBackendRef, + multi_cache_invalidator: Arc, + ) -> Arc { + let table_cache: AsyncCache = CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY) + .time_to_live(TABLE_CACHE_TTL) + .time_to_idle(TABLE_CACHE_TTI) + .build(); + multi_cache_invalidator + .add_invalidator(Arc::new(TableCacheInvalidator::new(table_cache.clone()))) + .await; + Arc::new_cyclic(|me| Self { partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())), table_metadata_manager: Arc::new(TableMetadataManager::new(backend)), - cache_invalidator, system_catalog: SystemCatalog { catalog_manager: me.clone(), catalog_cache: Cache::new(CATALOG_CACHE_MAX_CAPACITY), @@ -112,10 +123,7 @@ impl KvBackendCatalogManager { me.clone(), )), }, - table_cache: CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY) - .time_to_live(TABLE_CACHE_TTL) - .time_to_idle(TABLE_CACHE_TTI) - .build(), + table_cache, }) } @@ -134,7 +142,7 @@ impl CatalogManager for KvBackendCatalogManager { self } - async fn catalog_names(&self) -> CatalogResult> { + async fn catalog_names(&self) -> Result> { let stream = self .table_metadata_manager .catalog_manager() @@ -149,7 +157,7 @@ impl CatalogManager for KvBackendCatalogManager { Ok(keys) } - async fn schema_names(&self, catalog: &str) -> CatalogResult> { + async fn schema_names(&self, catalog: &str) -> Result> { let stream = self .table_metadata_manager .schema_manager() @@ -165,7 +173,7 @@ impl CatalogManager for KvBackendCatalogManager { Ok(keys.into_iter().collect()) } - async fn table_names(&self, catalog: &str, schema: &str) -> CatalogResult> { + async fn table_names(&self, catalog: &str, schema: &str) -> Result> { let stream = self .table_metadata_manager .table_name_manager() @@ -183,7 +191,7 @@ impl CatalogManager for KvBackendCatalogManager { Ok(tables.into_iter().collect()) } - async fn catalog_exists(&self, catalog: &str) -> CatalogResult { + async fn catalog_exists(&self, catalog: &str) -> Result { self.table_metadata_manager .catalog_manager() .exists(CatalogNameKey::new(catalog)) @@ -191,7 +199,7 @@ impl CatalogManager for KvBackendCatalogManager { .context(TableMetadataManagerSnafu) } - async fn schema_exists(&self, catalog: &str, schema: &str) -> CatalogResult { + async fn schema_exists(&self, catalog: &str, schema: &str) -> Result { if self.system_catalog.schema_exist(schema) { return Ok(true); } @@ -203,7 +211,7 @@ impl CatalogManager for KvBackendCatalogManager { .context(TableMetadataManagerSnafu) } - async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> CatalogResult { + async fn table_exists(&self, catalog: &str, schema: &str, table: &str) -> Result { if self.system_catalog.table_exist(schema, table) { return Ok(true); } @@ -222,7 +230,7 @@ impl CatalogManager for KvBackendCatalogManager { catalog: &str, schema: &str, table_name: &str, - ) -> CatalogResult> { + ) -> Result> { if let Some(table) = self.system_catalog.table(catalog, schema, table_name) { return Ok(Some(table)); } @@ -256,7 +264,7 @@ impl CatalogManager for KvBackendCatalogManager { } .fail(); }; - make_table(table_info_value) + build_table(table_info_value) }; match self @@ -279,7 +287,7 @@ impl CatalogManager for KvBackendCatalogManager { &'a self, catalog: &'a str, schema: &'a str, - ) -> BoxStream<'a, CatalogResult> { + ) -> BoxStream<'a, Result> { let sys_tables = try_stream!({ // System tables let sys_table_names = self.system_catalog.table_names(schema); @@ -303,7 +311,7 @@ impl CatalogManager for KvBackendCatalogManager { while let Some(table_ids) = table_id_chunks.next().await { let table_ids = table_ids .into_iter() - .collect::, _>>() + .collect::, _>>() .map_err(BoxedError::new) .context(ListTablesSnafu { catalog, schema })?; @@ -315,7 +323,7 @@ impl CatalogManager for KvBackendCatalogManager { .context(TableMetadataManagerSnafu)?; for table_info_value in table_info_values.into_values() { - yield make_table(table_info_value)?; + yield build_table(table_info_value)?; } } }); @@ -324,6 +332,14 @@ impl CatalogManager for KvBackendCatalogManager { } } +fn build_table(table_info_value: TableInfoValue) -> Result { + let table_info = table_info_value + .table_info + .try_into() + .context(InvalidTableInfoInCatalogSnafu)?; + Ok(DistTable::table(Arc::new(table_info))) +} + // TODO: This struct can hold a static map of all system tables when // the upper layer (e.g., procedure) can inform the catalog manager // a new catalog is created. diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 63f04ee5ed..c622aa4621 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -22,6 +22,7 @@ use catalog::kvbackend::{ use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_error::ext::ErrorExt; +use common_meta::cache_invalidator::MultiCacheInvalidator; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; @@ -252,9 +253,11 @@ async fn create_query_engine(meta_addr: &str) -> Result { let cached_meta_backend = Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); - + let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![ + cached_meta_backend.clone(), + ])); let catalog_list = - KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend); + KvBackendCatalogManager::new(cached_meta_backend.clone(), multi_cache_invalidator).await; let plugins: Plugins = Default::default(); let state = Arc::new(QueryEngineState::new( catalog_list, diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 39c7982eae..3100182f72 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager}; use clap::Parser; use client::client_manager::DatanodeClients; +use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_telemetry::logging; @@ -247,13 +248,20 @@ impl StartCommand { .cache_tti(cache_tti) .build(); let cached_meta_backend = Arc::new(cached_meta_backend); - - let catalog_manager = - KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone()); + let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![ + cached_meta_backend.clone(), + ])); + let catalog_manager = KvBackendCatalogManager::new( + cached_meta_backend.clone(), + multi_cache_invalidator.clone(), + ) + .await; let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())), + Arc::new(InvalidateTableCacheHandler::new( + multi_cache_invalidator.clone(), + )), ]); let heartbeat_task = HeartbeatTask::new( @@ -264,12 +272,12 @@ impl StartCommand { let mut instance = FrontendBuilder::new( cached_meta_backend.clone(), - catalog_manager.clone(), + catalog_manager, Arc::new(DatanodeClients::default()), meta_client, ) - .with_cache_invalidator(catalog_manager.clone()) .with_plugin(plugins.clone()) + .with_cache_invalidator(multi_cache_invalidator) .with_heartbeat_task(heartbeat_task) .try_build() .await diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 5d508adbf6..694d10d689 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -20,7 +20,7 @@ use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig}; -use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; +use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::ProcedureExecutorRef; @@ -400,8 +400,9 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; + let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); let catalog_manager = - KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator)); + KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await; let builder = DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); @@ -432,19 +433,18 @@ impl StartCommand { table_metadata_manager, procedure_manager.clone(), datanode_manager.clone(), - catalog_manager.clone(), + multi_cache_invalidator, table_meta_allocator, ) .await?; let mut frontend = FrontendBuilder::new( kv_backend, - catalog_manager.clone(), + catalog_manager, datanode_manager, ddl_task_executor, ) .with_plugin(fe_plugins.clone()) - .with_cache_invalidator(catalog_manager) .try_build() .await .context(StartFrontendSnafu)?; diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 7eed7f0139..0713807cf7 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -14,6 +14,8 @@ use std::sync::Arc; +use tokio::sync::RwLock; + use crate::error::Result; use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoKey; @@ -58,6 +60,34 @@ impl CacheInvalidator for DummyCacheInvalidator { } } +#[derive(Default)] +pub struct MultiCacheInvalidator { + invalidators: RwLock>, +} + +impl MultiCacheInvalidator { + pub fn with_invalidators(invalidators: Vec) -> Self { + Self { + invalidators: RwLock::new(invalidators), + } + } + + pub async fn add_invalidator(&self, invalidator: CacheInvalidatorRef) { + self.invalidators.write().await.push(invalidator); + } +} + +#[async_trait::async_trait] +impl CacheInvalidator for MultiCacheInvalidator { + async fn invalidate(&self, ctx: &Context, caches: Vec) -> Result<()> { + let invalidators = self.invalidators.read().await; + for invalidator in invalidators.iter() { + invalidator.invalidate(ctx, caches.clone()).await?; + } + Ok(()) + } +} + #[async_trait::async_trait] impl CacheInvalidator for T where diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index e1bec25cd0..1f77c1a678 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -331,23 +331,19 @@ impl AlterTableProcedure { async fn on_broadcast(&mut self) -> Result { let alter_kind = self.alter_kind()?; let cache_invalidator = &self.context.cache_invalidator; - - if matches!(alter_kind, Kind::RenameTable { .. }) { - cache_invalidator - .invalidate( - &Context::default(), - vec![CacheIdent::TableName(self.data.table_ref().into())], - ) - .await?; + let cache_keys = if matches!(alter_kind, Kind::RenameTable { .. }) { + vec![CacheIdent::TableName(self.data.table_ref().into())] } else { - cache_invalidator - .invalidate( - &Context::default(), - vec![CacheIdent::TableId(self.data.table_id())], - ) - .await?; + vec![ + CacheIdent::TableId(self.data.table_id()), + CacheIdent::TableName(self.data.table_ref().into()), + ] }; + cache_invalidator + .invalidate(&Context::default(), cache_keys) + .await?; + Ok(Status::done()) } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 600eab950f..4ca617b2ab 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -25,6 +25,7 @@ use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::kv_backend::chroot::ChrootKvBackend; @@ -352,13 +353,20 @@ impl GreptimeDbClusterBuilder { let cached_meta_backend = Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); - - let catalog_manager = - KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend.clone()); + let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![ + cached_meta_backend.clone(), + ])); + let catalog_manager = KvBackendCatalogManager::new( + cached_meta_backend.clone(), + multi_cache_invalidator.clone(), + ) + .await; let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new(catalog_manager.clone())), + Arc::new(InvalidateTableCacheHandler::new( + multi_cache_invalidator.clone(), + )), ]); let heartbeat_task = HeartbeatTask::new( @@ -369,11 +377,11 @@ impl GreptimeDbClusterBuilder { let instance = FrontendBuilder::new( cached_meta_backend.clone(), - catalog_manager.clone(), + catalog_manager, datanode_clients, meta_client, ) - .with_cache_invalidator(catalog_manager) + .with_cache_invalidator(multi_cache_invalidator) .with_heartbeat_task(heartbeat_task) .try_build() .await diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 5360f758c8..6b4340db27 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -19,7 +19,7 @@ use cmd::options::MixOptions; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::KvBackendConfig; -use common_meta::cache_invalidator::DummyCacheInvalidator; +use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; @@ -126,8 +126,9 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); + let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); let catalog_manager = - KvBackendCatalogManager::new(kv_backend.clone(), Arc::new(DummyCacheInvalidator)); + KvBackendCatalogManager::new(kv_backend.clone(), multi_cache_invalidator.clone()).await; let datanode_manager = Arc::new(StandaloneDatanodeManager(datanode.region_server())); @@ -150,7 +151,7 @@ impl GreptimeDbStandaloneBuilder { DdlManager::try_new( procedure_manager.clone(), datanode_manager.clone(), - Arc::new(DummyCacheInvalidator), + multi_cache_invalidator, table_metadata_manager, table_meta_allocator, Arc::new(MemoryRegionKeeper::default()), @@ -161,12 +162,11 @@ impl GreptimeDbStandaloneBuilder { let instance = FrontendBuilder::new( kv_backend.clone(), - catalog_manager.clone(), + catalog_manager, datanode_manager, ddl_task_executor, ) .with_plugin(plugins) - .with_cache_invalidator(catalog_manager) .try_build() .await .unwrap();