diff --git a/Cargo.lock b/Cargo.lock index 46597f9e3d..d5eba131e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1218,6 +1218,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "cache" +version = "0.7.2" +dependencies = [ + "catalog", + "common-meta", + "moka", +] + [[package]] name = "cactus" version = "1.0.7" @@ -1592,6 +1601,7 @@ dependencies = [ "async-trait", "auth", "base64 0.21.7", + "cache", "catalog", "chrono", "clap 4.5.4", @@ -1624,6 +1634,7 @@ dependencies = [ "meta-client", "meta-srv", "mito2", + "moka", "nu-ansi-term", "plugins", "prometheus", @@ -3581,6 +3592,7 @@ dependencies = [ "arc-swap", "async-trait", "auth", + "cache", "catalog", "client", "common-base", @@ -10294,6 +10306,7 @@ dependencies = [ "async-trait", "auth", "axum", + "cache", "catalog", "chrono", "clap 4.5.4", @@ -10323,6 +10336,7 @@ dependencies = [ "itertools 0.10.5", "meta-client", "meta-srv", + "moka", "mysql_async", "object-store", "opentelemetry-proto 0.5.0", diff --git a/Cargo.toml b/Cargo.toml index 1521b7930b..c48a4fc5e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "src/api", "src/auth", "src/catalog", + "src/cache", "src/client", "src/cmd", "src/common/base", @@ -173,6 +174,7 @@ zstd = "0.13" ## workspaces members api = { path = "src/api" } auth = { path = "src/auth" } +cache = { path = "src/cache" } catalog = { path = "src/catalog" } client = { path = "src/client" } cmd = { path = "src/cmd" } diff --git a/src/cache/Cargo.toml b/src/cache/Cargo.toml new file mode 100644 index 0000000000..b372018c58 --- /dev/null +++ b/src/cache/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "cache" +version.workspace = true +edition.workspace = true +license.workspace = true + +[dependencies] +catalog.workspace = true +common-meta.workspace = true +moka.workspace = true diff --git a/src/cache/src/lib.rs b/src/cache/src/lib.rs new file mode 100644 index 0000000000..dcef963249 --- /dev/null +++ b/src/cache/src/lib.rs @@ -0,0 +1,86 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use catalog::kvbackend::new_table_cache; +use common_meta::cache::{ + new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache, CacheRegistryBuilder, +}; +use common_meta::kv_backend::KvBackendRef; +use moka::future::CacheBuilder; + +const DEFAULT_CACHE_MAX_CAPACITY: u64 = 65536; +const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60); +const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60); + +pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache"; +pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache"; +pub const TABLE_CACHE_NAME: &str = "table_cache"; +pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache"; + +// TODO(weny): Make the cache configurable. +pub fn default_cache_registry_builder(kv_backend: KvBackendRef) -> CacheRegistryBuilder { + // Builds table info cache + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); + let table_info_cache = Arc::new(new_table_info_cache( + TABLE_INFO_CACHE_NAME.to_string(), + cache, + kv_backend.clone(), + )); + + // Builds table name cache + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); + let table_name_cache = Arc::new(new_table_name_cache( + TABLE_NAME_CACHE_NAME.to_string(), + cache, + kv_backend.clone(), + )); + + // Builds table cache + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); + let table_cache = Arc::new(new_table_cache( + TABLE_CACHE_NAME.to_string(), + cache, + table_info_cache.clone(), + table_name_cache.clone(), + )); + + // Builds table flownode set cache + let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) + .build(); + let table_flownode_set_cache = Arc::new(new_table_flownode_set_cache( + TABLE_FLOWNODE_SET_CACHE_NAME.to_string(), + cache, + kv_backend.clone(), + )); + + CacheRegistryBuilder::default() + .add_cache(table_info_cache) + .add_cache(table_name_cache) + .add_cache(table_cache) + .add_cache(table_flownode_set_cache) +} diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 2d3a908308..5834eaed35 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -318,15 +318,12 @@ pub enum Error { location: Location, }, - #[snafu(display("Get null from table cache, key: {}", key))] - TableCacheNotGet { - key: String, + #[snafu(display("Failed to get table cache"))] + GetTableCache { + source: common_meta::error::Error, #[snafu(implicit)] location: Location, }, - - #[snafu(display("Failed to get table cache, err: {}", err_msg))] - GetTableCache { err_msg: String }, } pub type Result = std::result::Result; @@ -388,7 +385,7 @@ impl ErrorExt for Error { Error::QueryAccessDenied { .. } => StatusCode::AccessDenied, Error::Datafusion { .. } => StatusCode::EngineExecuteQuery, Error::TableMetadataManager { source, .. } => source.status_code(), - Error::TableCacheNotGet { .. } | Error::GetTableCache { .. } => StatusCode::Internal, + Error::GetTableCache { .. } => StatusCode::Internal, } } diff --git a/src/catalog/src/kvbackend.rs b/src/catalog/src/kvbackend.rs index 6a1aa9e2ac..4414b091c3 100644 --- a/src/catalog/src/kvbackend.rs +++ b/src/catalog/src/kvbackend.rs @@ -16,5 +16,7 @@ pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend} mod client; mod manager; +mod table_cache; pub use manager::KvBackendCatalogManager; +pub use table_cache::{new_table_cache, TableCache, TableCacheRef}; diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index e72d2b5275..12f713757a 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -350,6 +350,13 @@ pub struct MetaKvBackend { pub client: Arc, } +impl MetaKvBackend { + /// Constructs a [MetaKvBackend]. + pub fn new(client: Arc) -> MetaKvBackend { + MetaKvBackend { client } + } +} + impl TxnService for MetaKvBackend { type Error = Error; } diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 19b5993916..69af7eeec4 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -15,27 +15,23 @@ use std::any::Any; use std::collections::BTreeSet; use std::sync::{Arc, Weak}; -use std::time::Duration; use async_stream::try_stream; use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID, }; -use common_catalog::format_full_table_name; use common_config::Mode; use common_error::ext::BoxedError; -use common_meta::cache_invalidator::{CacheInvalidator, Context, MultiCacheInvalidator}; -use common_meta::instruction::CacheIdent; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_name::TableNameKey; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; +use common_meta::table_name::TableName; use futures_util::stream::BoxStream; use futures_util::{StreamExt, TryStreamExt}; use meta_client::client::MetaClient; -use moka::future::{Cache as AsyncCache, CacheBuilder}; use moka::sync::Cache; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; @@ -43,12 +39,12 @@ use table::dist_table::DistTable; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; -use crate::error::Error::{GetTableCache, TableCacheNotGet}; use crate::error::{ - InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, ListTablesSnafu, Result, - TableCacheNotGetSnafu, TableMetadataManagerSnafu, + GetTableCacheSnafu, InvalidTableInfoInCatalogSnafu, ListCatalogsSnafu, ListSchemasSnafu, + ListTablesSnafu, Result, TableMetadataManagerSnafu, }; use crate::information_schema::InformationSchemaProvider; +use crate::kvbackend::TableCacheRef; use crate::CatalogManager; /// Access all existing catalog, schema and tables. @@ -64,60 +60,18 @@ pub struct KvBackendCatalogManager { table_metadata_manager: TableMetadataManagerRef, /// A sub-CatalogManager that handles system tables system_catalog: SystemCatalog, - table_cache: AsyncCache, -} - -struct TableCacheInvalidator { - table_cache: AsyncCache, -} - -impl TableCacheInvalidator { - pub fn new(table_cache: AsyncCache) -> Self { - Self { table_cache } - } -} - -#[async_trait::async_trait] -impl CacheInvalidator for TableCacheInvalidator { - async fn invalidate( - &self, - _ctx: &Context, - caches: &[CacheIdent], - ) -> common_meta::error::Result<()> { - for cache in caches { - if let CacheIdent::TableName(table_name) = cache { - let table_cache_key = format_full_table_name( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - ); - self.table_cache.invalidate(&table_cache_key).await; - } - } - Ok(()) - } + table_cache: TableCacheRef, } const CATALOG_CACHE_MAX_CAPACITY: u64 = 128; -const TABLE_CACHE_MAX_CAPACITY: u64 = 65536; -const TABLE_CACHE_TTL: Duration = Duration::from_secs(10 * 60); -const TABLE_CACHE_TTI: Duration = Duration::from_secs(5 * 60); impl KvBackendCatalogManager { pub async fn new( mode: Mode, meta_client: Option>, backend: KvBackendRef, - multi_cache_invalidator: Arc, + table_cache: TableCacheRef, ) -> Arc { - let table_cache: AsyncCache = CacheBuilder::new(TABLE_CACHE_MAX_CAPACITY) - .time_to_live(TABLE_CACHE_TTL) - .time_to_idle(TABLE_CACHE_TTI) - .build(); - multi_cache_invalidator - .add_invalidator(Arc::new(TableCacheInvalidator::new(table_cache.clone()))) - .await; - Arc::new_cyclic(|me| Self { mode, meta_client, @@ -245,60 +199,25 @@ impl CatalogManager for KvBackendCatalogManager { async fn table( &self, - catalog: &str, - schema: &str, + catalog_name: &str, + schema_name: &str, table_name: &str, ) -> Result> { - if let Some(table) = self.system_catalog.table(catalog, schema, table_name) { + if let Some(table) = self + .system_catalog + .table(catalog_name, schema_name, table_name) + { return Ok(Some(table)); } - let init = async { - let table_name_key = TableNameKey::new(catalog, schema, table_name); - let Some(table_name_value) = self - .table_metadata_manager - .table_name_manager() - .get(table_name_key) - .await - .context(TableMetadataManagerSnafu)? - else { - return TableCacheNotGetSnafu { - key: table_name_key.to_string(), - } - .fail(); - }; - let table_id = table_name_value.table_id(); - - let Some(table_info_value) = self - .table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? - .map(|v| v.into_inner()) - else { - return TableCacheNotGetSnafu { - key: table_name_key.to_string(), - } - .fail(); - }; - build_table(table_info_value) - }; - - match self - .table_cache - .try_get_with_by_ref(&format_full_table_name(catalog, schema, table_name), init) + self.table_cache + .get_by_ref(&TableName { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + }) .await - { - Ok(table) => Ok(Some(table)), - Err(err) => match err.as_ref() { - TableCacheNotGet { .. } => Ok(None), - _ => Err(err), - }, - } - .map_err(|err| GetTableCache { - err_msg: err.to_string(), - }) + .context(GetTableCacheSnafu) } fn tables<'a>(&'a self, catalog: &'a str, schema: &'a str) -> BoxStream<'a, Result> { diff --git a/src/catalog/src/kvbackend/table_cache.rs b/src/catalog/src/kvbackend/table_cache.rs new file mode 100644 index 0000000000..4ab73bf9d2 --- /dev/null +++ b/src/catalog/src/kvbackend/table_cache.rs @@ -0,0 +1,80 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_meta::cache::{CacheContainer, Initializer, TableInfoCacheRef, TableNameCacheRef}; +use common_meta::error::{Result as MetaResult, ValueNotExistSnafu}; +use common_meta::instruction::CacheIdent; +use common_meta::table_name::TableName; +use futures::future::BoxFuture; +use moka::future::Cache; +use snafu::OptionExt; +use table::dist_table::DistTable; +use table::TableRef; + +pub type TableCacheRef = Arc; + +/// [TableCache] caches the [TableName] to [TableRef] mapping. +pub type TableCache = CacheContainer; + +/// Constructs a [TableCache]. +pub fn new_table_cache( + name: String, + cache: Cache, + table_info_cache: TableInfoCacheRef, + table_name_cache: TableNameCacheRef, +) -> TableCache { + let init = init_factory(table_info_cache, table_name_cache); + + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory( + table_info_cache: TableInfoCacheRef, + table_name_cache: TableNameCacheRef, +) -> Initializer { + Arc::new(move |table_name| { + let table_info_cache = table_info_cache.clone(); + let table_name_cache = table_name_cache.clone(); + Box::pin(async move { + let table_id = table_name_cache + .get_by_ref(table_name) + .await? + .context(ValueNotExistSnafu)?; + let table_info = table_info_cache + .get_by_ref(&table_id) + .await? + .context(ValueNotExistSnafu)?; + + Ok(Some(DistTable::table(table_info))) + }) + }) +} + +fn invalidator<'a>( + cache: &'a Cache, + ident: &'a CacheIdent, +) -> BoxFuture<'a, MetaResult<()>> { + Box::pin(async move { + if let CacheIdent::TableName(table_name) = ident { + cache.invalidate(table_name).await + } + Ok(()) + }) +} + +fn filter(ident: &CacheIdent) -> bool { + matches!(ident, CacheIdent::TableName(_)) +} diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index ae27c4e374..fbef84fdca 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -19,6 +19,7 @@ workspace = true async-trait.workspace = true auth.workspace = true base64.workspace = true +cache.workspace = true catalog.workspace = true chrono.workspace = true clap.workspace = true @@ -52,6 +53,7 @@ lazy_static.workspace = true meta-client.workspace = true meta-srv.workspace = true mito2.workspace = true +moka.workspace = true nu-ansi-term = "0.46" plugins.workspace = true prometheus.workspace = true diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index 15d0dc9b0d..6c800455ac 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -303,6 +303,13 @@ pub enum Error { location: Location, source: common_runtime::error::Error, }, + + #[snafu(display("Failed to get cache from cache registry: {}", name))] + CacheRequired { + #[snafu(implicit)] + location: Location, + name: String, + }, } pub type Result = std::result::Result; @@ -352,6 +359,8 @@ impl ErrorExt for Error { Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected, + Error::CacheRequired { .. } => StatusCode::Internal, + Error::Other { source, .. } => source.status_code(), Error::BuildRuntime { source, .. } => source.status_code(), diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index e38edc5a0a..98c7a4d793 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -16,10 +16,10 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager}; +use cache::{default_cache_registry_builder, TABLE_CACHE_NAME}; +use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; use client::client_manager::DatanodeClients; -use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_telemetry::info; @@ -242,22 +242,27 @@ impl StartCommand { .cache_tti(cache_tti) .build(); let cached_meta_backend = Arc::new(cached_meta_backend); - let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![ - cached_meta_backend.clone(), - ])); + let cache_registry_builder = + default_cache_registry_builder(Arc::new(MetaKvBackend::new(meta_client.clone()))); + let cache_registry = Arc::new( + cache_registry_builder + .add_cache(cached_meta_backend.clone()) + .build(), + ); + let table_cache = cache_registry.get().context(error::CacheRequiredSnafu { + name: TABLE_CACHE_NAME, + })?; let catalog_manager = KvBackendCatalogManager::new( opts.mode, Some(meta_client.clone()), cached_meta_backend.clone(), - multi_cache_invalidator.clone(), + table_cache, ) .await; let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new( - multi_cache_invalidator.clone(), - )), + Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())), ]); let heartbeat_task = HeartbeatTask::new( @@ -269,12 +274,13 @@ impl StartCommand { let mut instance = FrontendBuilder::new( cached_meta_backend.clone(), + cache_registry.clone(), catalog_manager, Arc::new(DatanodeClients::default()), meta_client, ) .with_plugin(plugins.clone()) - .with_cache_invalidator(multi_cache_invalidator) + .with_local_cache_invalidator(cache_registry) .with_heartbeat_task(heartbeat_task) .try_build() .await diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 0c514512f6..a2679efceb 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,11 +16,12 @@ use std::sync::Arc; use std::{fs, path}; use async_trait::async_trait; +use cache::{default_cache_registry_builder, TABLE_CACHE_NAME}; use catalog::kvbackend::KvBackendCatalogManager; use clap::Parser; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::{metadata_store_dir, KvBackendConfig}; -use common_meta::cache_invalidator::{CacheInvalidatorRef, MultiCacheInvalidator}; +use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef}; use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef}; use common_meta::ddl::{DdlContext, ProcedureExecutorRef}; @@ -55,12 +56,13 @@ use servers::export_metrics::ExportMetricsOption; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use servers::Mode; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, InitTimezoneSnafu, - Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, - StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu, + CacheRequiredSnafu, CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, + InitTimezoneSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, + StartFrontendSnafu, StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, + StopProcedureManagerSnafu, }; use crate::options::{GlobalOptions, Options}; use crate::App; @@ -379,14 +381,12 @@ impl StartCommand { .await .context(StartFrontendSnafu)?; - let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); - let catalog_manager = KvBackendCatalogManager::new( - dn_opts.mode, - None, - kv_backend.clone(), - multi_cache_invalidator.clone(), - ) - .await; + let cache_registry = Arc::new(default_cache_registry_builder(kv_backend.clone()).build()); + let table_cache = cache_registry.get().context(CacheRequiredSnafu { + name: TABLE_CACHE_NAME, + })?; + let catalog_manager = + KvBackendCatalogManager::new(dn_opts.mode, None, kv_backend.clone(), table_cache).await; let builder = DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone()); @@ -424,7 +424,7 @@ impl StartCommand { let ddl_task_executor = Self::create_ddl_task_executor( procedure_manager.clone(), node_manager.clone(), - multi_cache_invalidator, + cache_registry.clone(), table_metadata_manager, table_meta_allocator, flow_metadata_manager, @@ -432,12 +432,17 @@ impl StartCommand { ) .await?; - let mut frontend = - FrontendBuilder::new(kv_backend, catalog_manager, node_manager, ddl_task_executor) - .with_plugin(fe_plugins.clone()) - .try_build() - .await - .context(StartFrontendSnafu)?; + let mut frontend = FrontendBuilder::new( + kv_backend, + cache_registry, + catalog_manager, + node_manager, + ddl_task_executor, + ) + .with_plugin(fe_plugins.clone()) + .try_build() + .await + .context(StartFrontendSnafu)?; let servers = Services::new(fe_opts.clone(), Arc::new(frontend.clone()), fe_plugins) .build() diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index 49c064ab97..d00c0688cb 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -18,7 +18,7 @@ mod registry; mod table; pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter}; -pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache}; +pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache, TableFlownodeSetCacheRef}; pub use registry::{CacheRegistry, CacheRegistryBuilder, CacheRegistryRef}; pub use table::{ new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache, diff --git a/src/common/meta/src/cache/flow.rs b/src/common/meta/src/cache/flow.rs index 51b16d8999..4430e9fce3 100644 --- a/src/common/meta/src/cache/flow.rs +++ b/src/common/meta/src/cache/flow.rs @@ -13,4 +13,6 @@ // limitations under the License. mod table_flownode; -pub use table_flownode::{new_table_flownode_set_cache, TableFlownodeSetCache}; +pub use table_flownode::{ + new_table_flownode_set_cache, TableFlownodeSetCache, TableFlownodeSetCacheRef, +}; diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index a339a4daa2..a15578a1b8 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -30,6 +30,8 @@ use crate::FlownodeId; type FlownodeSet = HashSet; +pub type TableFlownodeSetCacheRef = Arc; + /// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping. pub type TableFlownodeSetCache = CacheContainer; @@ -54,6 +56,10 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer>() .await + // We must cache the `HashSet` even if it's empty, + // to avoid future requests to the remote storage next time; + // If the value is added to the remote storage, + // we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`. .map(Some) }) }) diff --git a/src/common/meta/src/cache/registry.rs b/src/common/meta/src/cache/registry.rs index e330eab9cd..2d8940e70e 100644 --- a/src/common/meta/src/cache/registry.rs +++ b/src/common/meta/src/cache/registry.rs @@ -50,7 +50,7 @@ pub struct CacheRegistry { } #[async_trait::async_trait] -impl CacheInvalidator for Arc { +impl CacheInvalidator for CacheRegistry { async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> { let tasks = self .indexes diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 3281971c1a..81939ad615 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -14,8 +14,6 @@ use std::sync::Arc; -use tokio::sync::RwLock; - use crate::error::Result; use crate::instruction::CacheIdent; use crate::key::schema_name::SchemaNameKey; @@ -61,34 +59,6 @@ impl CacheInvalidator for DummyCacheInvalidator { } } -#[derive(Default)] -pub struct MultiCacheInvalidator { - invalidators: RwLock>, -} - -impl MultiCacheInvalidator { - pub fn with_invalidators(invalidators: Vec) -> Self { - Self { - invalidators: RwLock::new(invalidators), - } - } - - pub async fn add_invalidator(&self, invalidator: CacheInvalidatorRef) { - self.invalidators.write().await.push(invalidator); - } -} - -#[async_trait::async_trait] -impl CacheInvalidator for MultiCacheInvalidator { - async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()> { - let invalidators = self.invalidators.read().await; - for invalidator in invalidators.iter() { - invalidator.invalidate(ctx, caches).await?; - } - Ok(()) - } -} - #[async_trait::async_trait] impl CacheInvalidator for T where diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index fdb063d176..8bece74e78 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -172,20 +172,16 @@ impl AlterTableProcedure { /// Broadcasts the invalidating table cache instructions. async fn on_broadcast(&mut self) -> Result { - // Safety: Checked in `AlterTableProcedure::new`. - let alter_kind = self.data.task.alter_table.kind.as_ref().unwrap(); let cache_invalidator = &self.context.cache_invalidator; - let cache_keys = if matches!(alter_kind, Kind::RenameTable { .. }) { - vec![CacheIdent::TableName(self.data.table_ref().into())] - } else { - vec![ - CacheIdent::TableId(self.data.table_id()), - CacheIdent::TableName(self.data.table_ref().into()), - ] - }; cache_invalidator - .invalidate(&Context::default(), &cache_keys) + .invalidate( + &Context::default(), + &[ + CacheIdent::TableId(self.data.table_id()), + CacheIdent::TableName(self.data.table_ref().into()), + ], + ) .await?; Ok(Status::done()) diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 512531565d..4bc0c52c7a 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -17,6 +17,7 @@ api.workspace = true arc-swap = "1.0" async-trait = "0.1" auth.workspace = true +cache.workspace = true catalog.workspace = true client.workspace = true common-base.workspace = true diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 4cceb710de..1a13aefb85 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -346,6 +346,13 @@ pub enum Error { #[snafu(source)] error: toml::ser::Error, }, + + #[snafu(display("Failed to get cache from cache registry: {}", name))] + CacheRequired { + #[snafu(implicit)] + location: Location, + name: String, + }, } pub type Result = std::result::Result; @@ -397,7 +404,8 @@ impl ErrorExt for Error { Error::FindDatanode { .. } | Error::VectorToGrpcColumn { .. } - | Error::InvalidRegionRequest { .. } => StatusCode::Internal, + | Error::InvalidRegionRequest { .. } + | Error::CacheRequired { .. } => StatusCode::Internal, Error::ContextValueNotFound { .. } => StatusCode::Unexpected, diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 45a749a436..e44ee680c5 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -14,11 +14,12 @@ use std::sync::Arc; +use cache::TABLE_FLOWNODE_SET_CACHE_NAME; use catalog::CatalogManagerRef; use common_base::Plugins; +use common_meta::cache::CacheRegistryRef; use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator}; use common_meta::ddl::ProcedureExecutorRef; -use common_meta::key::flow::TableFlowManager; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::KvBackendRef; use common_meta::node_manager::NodeManagerRef; @@ -31,8 +32,9 @@ use operator::table::TableMutationOperator; use partition::manager::PartitionRuleManager; use query::QueryEngineFactory; use servers::server::ServerHandlers; +use snafu::OptionExt; -use crate::error::Result; +use crate::error::{self, Result}; use crate::heartbeat::HeartbeatTask; use crate::instance::region_query::FrontendRegionQueryHandler; use crate::instance::{Instance, StatementExecutorRef}; @@ -41,7 +43,8 @@ use crate::script::ScriptExecutor; /// The frontend [`Instance`] builder. pub struct FrontendBuilder { kv_backend: KvBackendRef, - cache_invalidator: Option, + cache_registry: CacheRegistryRef, + local_cache_invalidator: Option, catalog_manager: CatalogManagerRef, node_manager: NodeManagerRef, plugins: Option, @@ -52,13 +55,15 @@ pub struct FrontendBuilder { impl FrontendBuilder { pub fn new( kv_backend: KvBackendRef, + cache_registry: CacheRegistryRef, catalog_manager: CatalogManagerRef, node_manager: NodeManagerRef, procedure_executor: ProcedureExecutorRef, ) -> Self { Self { kv_backend, - cache_invalidator: None, + cache_registry, + local_cache_invalidator: None, catalog_manager, node_manager, plugins: None, @@ -67,9 +72,9 @@ impl FrontendBuilder { } } - pub fn with_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self { + pub fn with_local_cache_invalidator(self, cache_invalidator: CacheInvalidatorRef) -> Self { Self { - cache_invalidator: Some(cache_invalidator), + local_cache_invalidator: Some(cache_invalidator), ..self } } @@ -95,20 +100,24 @@ impl FrontendBuilder { let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); - let cache_invalidator = self - .cache_invalidator + let local_cache_invalidator = self + .local_cache_invalidator .unwrap_or_else(|| Arc::new(DummyCacheInvalidator)); let region_query_handler = FrontendRegionQueryHandler::arc(partition_manager.clone(), node_manager.clone()); - let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend.clone())); - + let table_flownode_cache = + self.cache_registry + .get() + .context(error::CacheRequiredSnafu { + name: TABLE_FLOWNODE_SET_CACHE_NAME, + })?; let inserter = Arc::new(Inserter::new( self.catalog_manager.clone(), partition_manager.clone(), node_manager.clone(), - table_flow_manager, + table_flownode_cache, )); let deleter = Arc::new(Deleter::new( self.catalog_manager.clone(), @@ -149,7 +158,7 @@ impl FrontendBuilder { query_engine.clone(), self.procedure_executor, kv_backend.clone(), - cache_invalidator, + local_cache_invalidator, inserter.clone(), )); diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index cf9f44b30e..21eb16e324 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -26,7 +26,7 @@ use catalog::CatalogManagerRef; use client::{OutputData, OutputMeta}; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; -use common_meta::key::flow::TableFlowManagerRef; +use common_meta::cache::TableFlownodeSetCacheRef; use common_meta::node_manager::{AffectedRows, NodeManagerRef}; use common_meta::peer::Peer; use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; @@ -34,7 +34,7 @@ use common_query::Output; use common_telemetry::tracing_context::TracingContext; use common_telemetry::{error, info, warn}; use datatypes::schema::Schema; -use futures_util::{future, TryStreamExt}; +use futures_util::future; use meter_macros::write_meter; use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; @@ -62,7 +62,7 @@ pub struct Inserter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, - table_flow_manager: TableFlowManagerRef, + table_flownode_set_cache: TableFlownodeSetCacheRef, } pub type InserterRef = Arc; @@ -72,13 +72,13 @@ impl Inserter { catalog_manager: CatalogManagerRef, partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, - table_flow_manager: TableFlowManagerRef, + table_flownode_set_cache: TableFlownodeSetCacheRef, ) -> Self { Self { catalog_manager, partition_manager, node_manager, - table_flow_manager, + table_flownode_set_cache, } } @@ -196,10 +196,10 @@ impl Inserter { impl Inserter { fn post_request(&self, requests: RegionInsertRequests) { let node_manager = self.node_manager.clone(); - let table_flow_manager = self.table_flow_manager.clone(); + let table_flownode_set_cache = self.table_flownode_set_cache.clone(); // Spawn all tasks that do job for mirror insert requests for flownode common_runtime::spawn_bg(async move { - match Self::mirror_flow_node_requests(table_flow_manager, requests).await { + match Self::mirror_flow_node_requests(table_flownode_set_cache, requests).await { Ok(flow_tasks) => { let flow_tasks = flow_tasks.into_iter().map(|(peer, inserts)| { let node_manager = node_manager.clone(); @@ -272,7 +272,7 @@ impl Inserter { /// Mirror requests for source table to flownode async fn mirror_flow_node_requests( - table_flow_manager: TableFlowManagerRef, + table_flownode_set_cache: TableFlownodeSetCacheRef, requests: RegionInsertRequests, ) -> Result> { // store partial source table requests used by flow node(only store what's used) @@ -285,17 +285,15 @@ impl Inserter { Some(None) => continue, _ => { let table_id = RegionId::from_u64(req.region_id).table_id(); - let peers = table_flow_manager - .flows(table_id) - // TODO(discord9): determine where to store the flow node address in distributed mode - .map_ok(|key| Peer::new(key.flownode_id(), "")) - .try_collect::>() + // TODO(discord9): determine where to store the flow node address in distributed mode + let peers = table_flownode_set_cache + .get(table_id) .await - .map(|mut v| { - v.dedup(); - v - }) - .context(RequestInsertsSnafu)?; + .context(RequestInsertsSnafu)? + .unwrap_or_default() + .into_iter() + .map(|id| Peer::new(id, "")) + .collect::>(); if !peers.is_empty() { let mut reqs = RegionInsertRequests::default(); diff --git a/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs b/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs index c4b60b50d1..2cf3240761 100644 --- a/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs +++ b/tests-fuzz/targets/unstable/fuzz_create_table_standalone.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::fs::create_dir_all; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use std::time::Duration; use common_telemetry::info; use common_telemetry::tracing::warn; @@ -96,6 +97,7 @@ fn generate_create_table_expr(rng: &mut R) -> CreateTableExpr async fn connect_mysql(addr: &str) -> Pool { loop { match MySqlPoolOptions::new() + .acquire_timeout(Duration::from_secs(30)) .connect(&format!("mysql://{addr}/public")) .await { diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index c8be68ce89..01681c37b0 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -17,6 +17,7 @@ async-stream.workspace = true async-trait = "0.1" auth.workspace = true axum.workspace = true +cache.workspace = true catalog.workspace = true chrono.workspace = true clap.workspace = true @@ -43,6 +44,7 @@ futures.workspace = true futures-util.workspace = true meta-client.workspace = true meta-srv = { workspace = true, features = ["mock"] } +moka.workspace = true mysql_async = { version = "0.33", default-features = false, features = [ "default-rustls", ] } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index b75b6f11aa..0536cd1d12 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -19,12 +19,12 @@ use std::time::Duration; use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; +use cache::default_cache_registry_builder; use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; -use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_meta::kv_backend::chroot::ChrootKvBackend; @@ -346,22 +346,25 @@ impl GreptimeDbClusterBuilder { let cached_meta_backend = Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); - let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::with_invalidators(vec![ - cached_meta_backend.clone(), - ])); + let cache_registry_builder = + default_cache_registry_builder(Arc::new(MetaKvBackend::new(meta_client.clone()))); + let cache_registry = Arc::new( + cache_registry_builder + .add_cache(cached_meta_backend.clone()) + .build(), + ); + let table_cache = cache_registry.get().unwrap(); let catalog_manager = KvBackendCatalogManager::new( Mode::Distributed, Some(meta_client.clone()), cached_meta_backend.clone(), - multi_cache_invalidator.clone(), + table_cache, ) .await; let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new( - multi_cache_invalidator.clone(), - )), + Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())), ]); let heartbeat_task = HeartbeatTask::new( @@ -373,11 +376,12 @@ impl GreptimeDbClusterBuilder { let instance = FrontendBuilder::new( cached_meta_backend.clone(), + cache_registry.clone(), catalog_manager, datanode_clients, meta_client, ) - .with_cache_invalidator(multi_cache_invalidator) + .with_local_cache_invalidator(cache_registry) .with_heartbeat_task(heartbeat_task) .try_build() .await diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 56e3f7c0ab..661d4e0376 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -14,12 +14,12 @@ use std::sync::Arc; +use cache::default_cache_registry_builder; use catalog::kvbackend::KvBackendCatalogManager; use cmd::standalone::StandaloneOptions; use common_base::Plugins; use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID}; use common_config::KvBackendConfig; -use common_meta::cache_invalidator::MultiCacheInvalidator; use common_meta::ddl::flow_meta::FlowMetadataAllocator; use common_meta::ddl::table_meta::TableMetadataAllocator; use common_meta::ddl::DdlContext; @@ -128,12 +128,12 @@ impl GreptimeDbStandaloneBuilder { let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone())); table_metadata_manager.init().await.unwrap(); let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone())); - let multi_cache_invalidator = Arc::new(MultiCacheInvalidator::default()); + let cache_registry = Arc::new(default_cache_registry_builder(kv_backend.clone()).build()); let catalog_manager = KvBackendCatalogManager::new( Mode::Standalone, None, kv_backend.clone(), - multi_cache_invalidator.clone(), + cache_registry.get().unwrap(), ) .await; @@ -167,7 +167,7 @@ impl GreptimeDbStandaloneBuilder { DdlManager::try_new( DdlContext { node_manager: node_manager.clone(), - cache_invalidator: multi_cache_invalidator, + cache_invalidator: cache_registry.clone(), memory_region_keeper: Arc::new(MemoryRegionKeeper::default()), table_metadata_manager, table_metadata_allocator, @@ -182,6 +182,7 @@ impl GreptimeDbStandaloneBuilder { let instance = FrontendBuilder::new( kv_backend.clone(), + cache_registry, catalog_manager, node_manager, ddl_task_executor,