diff --git a/src/common/meta/src/cache.rs b/src/common/meta/src/cache.rs index f7de3027fd..26a6ea2c58 100644 --- a/src/common/meta/src/cache.rs +++ b/src/common/meta/src/cache.rs @@ -14,6 +14,11 @@ mod container; mod flow; +mod table; pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter}; pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache}; +pub use table::{ + new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache, + TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRouteCache, TableRouteCacheRef, +}; diff --git a/src/common/meta/src/cache/container.rs b/src/common/meta/src/cache/container.rs index 2c0c94460a..8c85b3851d 100644 --- a/src/common/meta/src/cache/container.rs +++ b/src/common/meta/src/cache/container.rs @@ -126,6 +126,15 @@ where Ok(()) } + /// Returns true if the cache contains a value for the key. + pub fn contains_key(&self, key: &Q) -> bool + where + K: Borrow, + Q: Hash + Eq + ?Sized, + { + self.cache.contains_key(key) + } + /// Returns a _clone_ of the value corresponding to the key. pub async fn get_by_ref(&self, key: &Q) -> Result> where diff --git a/src/common/meta/src/cache/table.rs b/src/common/meta/src/cache/table.rs new file mode 100644 index 0000000000..016275fa97 --- /dev/null +++ b/src/common/meta/src/cache/table.rs @@ -0,0 +1,20 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod table_info; +mod table_name; +mod table_route; +pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef}; +pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef}; +pub use table_route::{new_table_route_cache, TableRouteCache, TableRouteCacheRef}; diff --git a/src/common/meta/src/cache/table/table_info.rs b/src/common/meta/src/cache/table/table_info.rs new file mode 100644 index 0000000000..2f8d188d3d --- /dev/null +++ b/src/common/meta/src/cache/table/table_info.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::future::BoxFuture; +use moka::future::Cache; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::TableId; +use table::metadata::TableInfo; + +use crate::cache::{CacheContainer, Initializer}; +use crate::error; +use crate::error::Result; +use crate::instruction::CacheIdent; +use crate::key::table_info::{TableInfoManager, TableInfoManagerRef}; +use crate::kv_backend::KvBackendRef; + +/// [TableInfoCache] caches the [TableId] to [TableInfo] mapping. +pub type TableInfoCache = CacheContainer, CacheIdent>; + +pub type TableInfoCacheRef = Arc; + +/// Constructs a [TableInfoCache]. +pub fn new_table_info_cache( + name: String, + cache: Cache>, + kv_backend: KvBackendRef, +) -> TableInfoCache { + let table_info_manager = Arc::new(TableInfoManager::new(kv_backend)); + let init = init_factory(table_info_manager); + + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer> { + Arc::new(move |table_id| { + let table_info_manager = table_info_manager.clone(); + Box::pin(async move { + let raw_table_info = table_info_manager + .get(*table_id) + .await? + .context(error::ValueNotExistSnafu {})? + .into_inner() + .table_info; + Ok(Some(Arc::new( + TableInfo::try_from(raw_table_info).context(error::ConvertRawTableInfoSnafu)?, + ))) + }) + }) +} + +fn invalidator<'a>( + cache: &'a Cache>, + ident: &'a CacheIdent, +) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + if let CacheIdent::TableId(table_id) = ident { + cache.invalidate(table_id).await + } + Ok(()) + }) +} + +fn filter(ident: &CacheIdent) -> bool { + matches!(ident, CacheIdent::TableId(_)) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use moka::future::CacheBuilder; + + use super::*; + use crate::ddl::test_util::create_table::test_create_table_task; + use crate::key::table_route::TableRouteValue; + use crate::key::TableMetadataManager; + use crate::kv_backend::memory::MemoryKvBackend; + + #[tokio::test] + async fn test_cache() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_info_cache("test".to_string(), cache, mem_kv.clone()); + + let result = cache.get(1024).await.unwrap(); + assert!(result.is_none()); + let task = test_create_table_task("my_table", 1024); + table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(vec![]), + HashMap::new(), + ) + .await + .unwrap(); + let table_info = cache.get(1024).await.unwrap().unwrap(); + assert_eq!(*table_info, TableInfo::try_from(task.table_info).unwrap()); + + assert!(cache.contains_key(&1024)); + cache + .invalidate(&[CacheIdent::TableId(1024)]) + .await + .unwrap(); + assert!(!cache.contains_key(&1024)); + } +} diff --git a/src/common/meta/src/cache/table/table_name.rs b/src/common/meta/src/cache/table/table_name.rs new file mode 100644 index 0000000000..0ec88a2d6e --- /dev/null +++ b/src/common/meta/src/cache/table/table_name.rs @@ -0,0 +1,173 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::future::BoxFuture; +use moka::future::Cache; +use snafu::OptionExt; +use table::metadata::TableId; + +use crate::cache::{CacheContainer, Initializer}; +use crate::error; +use crate::error::Result; +use crate::instruction::CacheIdent; +use crate::key::table_name::{TableNameKey, TableNameManager, TableNameManagerRef}; +use crate::kv_backend::KvBackendRef; +use crate::table_name::TableName; + +/// [TableNameCache] caches the [TableName] to [TableId] mapping. +pub type TableNameCache = CacheContainer; + +pub type TableNameCacheRef = Arc; + +/// Constructs a [TableNameCache]. +pub fn new_table_name_cache( + name: String, + cache: Cache, + kv_backend: KvBackendRef, +) -> TableNameCache { + let table_name_manager = Arc::new(TableNameManager::new(kv_backend)); + let init = init_factory(table_name_manager); + + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer { + Arc::new( + move |TableName { + catalog_name, + schema_name, + table_name, + }| { + let table_name_manager = table_name_manager.clone(); + Box::pin(async move { + Ok(Some( + table_name_manager + .get(TableNameKey { + catalog: catalog_name, + schema: schema_name, + table: table_name, + }) + .await? + .context(error::ValueNotExistSnafu {})? + .table_id(), + )) + }) + }, + ) +} + +fn invalidator<'a>( + cache: &'a Cache, + ident: &'a CacheIdent, +) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + if let CacheIdent::TableName(table_name) = ident { + cache.invalidate(table_name).await + } + Ok(()) + }) +} + +fn filter(ident: &CacheIdent) -> bool { + matches!(ident, CacheIdent::TableName(_)) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use moka::future::CacheBuilder; + + use super::*; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::kv_backend::txn::TxnService; + + #[tokio::test] + async fn test_cache_get() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone()); + let result = cache + .get_by_ref(&TableName { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "my_table".to_string(), + }) + .await + .unwrap(); + assert!(result.is_none()); + // Puts a new value. + let table_name_manager = TableNameManager::new(mem_kv.clone()); + let table_id = 1024; + let txn = table_name_manager + .build_create_txn( + &TableNameKey { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_SCHEMA_NAME, + table: "my_table", + }, + table_id, + ) + .unwrap(); + mem_kv.txn(txn).await.unwrap(); + let got = cache + .get_by_ref(&TableName { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "my_table".to_string(), + }) + .await + .unwrap() + .unwrap(); + assert_eq!(got, table_id); + } + + #[tokio::test] + async fn test_invalidate_cache() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone()); + // Puts a new value. + let table_name_manager = TableNameManager::new(mem_kv.clone()); + let table_id = 1024; + let table_name = TableName { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "my_table".to_string(), + }; + let txn = table_name_manager + .build_create_txn( + &TableNameKey { + catalog: DEFAULT_CATALOG_NAME, + schema: DEFAULT_SCHEMA_NAME, + table: "my_table", + }, + table_id, + ) + .unwrap(); + mem_kv.txn(txn).await.unwrap(); + let got = cache.get_by_ref(&table_name).await.unwrap().unwrap(); + assert_eq!(got, table_id); + + assert!(cache.contains_key(&table_name)); + cache + .invalidate(&[CacheIdent::TableName(table_name.clone())]) + .await + .unwrap(); + assert!(!cache.contains_key(&table_name)); + } +} diff --git a/src/common/meta/src/cache/table/table_route.rs b/src/common/meta/src/cache/table/table_route.rs new file mode 100644 index 0000000000..d7667cf81e --- /dev/null +++ b/src/common/meta/src/cache/table/table_route.rs @@ -0,0 +1,137 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::future::BoxFuture; +use moka::future::Cache; +use snafu::OptionExt; +use store_api::storage::TableId; + +use crate::cache::{CacheContainer, Initializer}; +use crate::error; +use crate::error::Result; +use crate::instruction::CacheIdent; +use crate::key::table_route::{TableRouteManager, TableRouteManagerRef, TableRouteValue}; +use crate::kv_backend::KvBackendRef; + +/// [TableRouteCache] caches the [TableId] to [TableRouteValue] mapping. +pub type TableRouteCache = CacheContainer, CacheIdent>; + +pub type TableRouteCacheRef = Arc; + +/// Constructs a [TableRouteCache]. +pub fn new_table_route_cache( + name: String, + cache: Cache>, + kv_backend: KvBackendRef, +) -> TableRouteCache { + let table_info_manager = Arc::new(TableRouteManager::new(kv_backend)); + let init = init_factory(table_info_manager); + + CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter)) +} + +fn init_factory( + table_route_manager: TableRouteManagerRef, +) -> Initializer> { + Arc::new(move |table_id| { + let table_route_manager = table_route_manager.clone(); + Box::pin(async move { + let table_route_value = table_route_manager + .table_route_storage() + .get(*table_id) + .await? + .context(error::ValueNotExistSnafu {})?; + + Ok(Some(Arc::new(table_route_value))) + }) + }) +} + +fn invalidator<'a>( + cache: &'a Cache>, + ident: &'a CacheIdent, +) -> BoxFuture<'a, Result<()>> { + Box::pin(async move { + if let CacheIdent::TableId(table_id) = ident { + cache.invalidate(table_id).await + } + Ok(()) + }) +} + +fn filter(ident: &CacheIdent) -> bool { + matches!(ident, CacheIdent::TableId(_)) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use moka::future::CacheBuilder; + use store_api::storage::RegionId; + + use super::*; + use crate::ddl::test_util::create_table::test_create_table_task; + use crate::key::table_route::TableRouteValue; + use crate::key::TableMetadataManager; + use crate::kv_backend::memory::MemoryKvBackend; + use crate::peer::Peer; + use crate::rpc::router::{Region, RegionRoute}; + + #[tokio::test] + async fn test_cache() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv.clone()); + let cache = CacheBuilder::new(128).build(); + let cache = new_table_route_cache("test".to_string(), cache, mem_kv.clone()); + + let result = cache.get(1024).await.unwrap(); + assert!(result.is_none()); + let task = test_create_table_task("my_table", 1024); + let table_id = 10; + let region_id = RegionId::new(table_id, 1); + let peer = Peer::empty(1); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(peer.clone()), + ..Default::default() + }]; + table_metadata_manager + .create_table_metadata( + task.table_info.clone(), + TableRouteValue::physical(region_routes.clone()), + HashMap::new(), + ) + .await + .unwrap(); + let table_route = cache.get(1024).await.unwrap().unwrap(); + assert_eq!( + (*table_route) + .clone() + .into_physical_table_route() + .region_routes, + region_routes + ); + + assert!(cache.contains_key(&1024)); + cache + .invalidate(&[CacheIdent::TableId(1024)]) + .await + .unwrap(); + assert!(!cache.contains_key(&1024)); + } +} diff --git a/src/common/meta/src/key/table_info.rs b/src/common/meta/src/key/table_info.rs index 7283281c14..3a9847e180 100644 --- a/src/common/meta/src/key/table_info.rs +++ b/src/common/meta/src/key/table_info.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::fmt::Display; +use std::sync::Arc; use serde::{Deserialize, Serialize}; use snafu::OptionExt; @@ -128,6 +129,7 @@ impl TableInfoValue { } } +pub type TableInfoManagerRef = Arc; pub struct TableInfoManager { kv_backend: KvBackendRef, } diff --git a/src/common/meta/src/key/table_name.rs b/src/common/meta/src/key/table_name.rs index 2eb30381fc..6c6c51c375 100644 --- a/src/common/meta/src/key/table_name.rs +++ b/src/common/meta/src/key/table_name.rs @@ -31,7 +31,7 @@ use crate::rpc::store::{BatchGetRequest, RangeRequest}; use crate::rpc::KeyValue; use crate::table_name::TableName; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] pub struct TableNameKey<'a> { pub catalog: &'a str, pub schema: &'a str, @@ -155,6 +155,8 @@ impl TableNameValue { } } +pub type TableNameManagerRef = Arc; + #[derive(Clone)] pub struct TableNameManager { kv_backend: KvBackendRef, diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index eca8d702ef..1b8c30ec11 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -14,6 +14,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Display; +use std::sync::Arc; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; @@ -276,6 +277,8 @@ impl Display for TableRouteKey { } } +pub type TableRouteManagerRef = Arc; + pub struct TableRouteManager { storage: TableRouteStorage, }