feat: introduce TableNameCache & TableInfoCache & TableRouteCache (#3895)

* feat: implement the `TableNameCache`

* feat: implement the `TableInfoCache`

* feat: implement the `TableRouteCache`

* test: add tests for `TableInfoCache` & `TableRouteCache`

* chore: use `TableId`
This commit is contained in:
Weny Xu
2024-05-10 18:50:44 +09:00
committed by GitHub
parent 25e9076f5b
commit c91132bd14
9 changed files with 473 additions and 1 deletions

View File

@@ -14,6 +14,11 @@
mod container;
mod flow;
mod table;
pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter};
pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache};
pub use table::{
new_table_info_cache, new_table_name_cache, new_table_route_cache, TableInfoCache,
TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRouteCache, TableRouteCacheRef,
};

View File

@@ -126,6 +126,15 @@ where
Ok(())
}
/// Returns true if the cache contains a value for the key.
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.cache.contains_key(key)
}
/// Returns a _clone_ of the value corresponding to the key.
pub async fn get_by_ref<Q>(&self, key: &Q) -> Result<Option<V>>
where

20
src/common/meta/src/cache/table.rs vendored Normal file
View File

@@ -0,0 +1,20 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod table_info;
mod table_name;
mod table_route;
pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef};
pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef};
pub use table_route::{new_table_route_cache, TableRouteCache, TableRouteCacheRef};

View File

@@ -0,0 +1,121 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::future::BoxFuture;
use moka::future::Cache;
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use table::metadata::TableInfo;
use crate::cache::{CacheContainer, Initializer};
use crate::error;
use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
use crate::kv_backend::KvBackendRef;
/// [TableInfoCache] caches the [TableId] to [TableInfo] mapping.
pub type TableInfoCache = CacheContainer<TableId, Arc<TableInfo>, CacheIdent>;
pub type TableInfoCacheRef = Arc<TableInfoCache>;
/// Constructs a [TableInfoCache].
pub fn new_table_info_cache(
name: String,
cache: Cache<TableId, Arc<TableInfo>>,
kv_backend: KvBackendRef,
) -> TableInfoCache {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend));
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId, Arc<TableInfo>> {
Arc::new(move |table_id| {
let table_info_manager = table_info_manager.clone();
Box::pin(async move {
let raw_table_info = table_info_manager
.get(*table_id)
.await?
.context(error::ValueNotExistSnafu {})?
.into_inner()
.table_info;
Ok(Some(Arc::new(
TableInfo::try_from(raw_table_info).context(error::ConvertRawTableInfoSnafu)?,
)))
})
})
}
fn invalidator<'a>(
cache: &'a Cache<TableId, Arc<TableInfo>>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
if let CacheIdent::TableId(table_id) = ident {
cache.invalidate(table_id).await
}
Ok(())
})
}
fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::TableId(_))
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use moka::future::CacheBuilder;
use super::*;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
#[tokio::test]
async fn test_cache() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let cache = CacheBuilder::new(128).build();
let cache = new_table_info_cache("test".to_string(), cache, mem_kv.clone());
let result = cache.get(1024).await.unwrap();
assert!(result.is_none());
let task = test_create_table_task("my_table", 1024);
table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(vec![]),
HashMap::new(),
)
.await
.unwrap();
let table_info = cache.get(1024).await.unwrap().unwrap();
assert_eq!(*table_info, TableInfo::try_from(task.table_info).unwrap());
assert!(cache.contains_key(&1024));
cache
.invalidate(&[CacheIdent::TableId(1024)])
.await
.unwrap();
assert!(!cache.contains_key(&1024));
}
}

View File

@@ -0,0 +1,173 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use table::metadata::TableId;
use crate::cache::{CacheContainer, Initializer};
use crate::error;
use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::table_name::{TableNameKey, TableNameManager, TableNameManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::table_name::TableName;
/// [TableNameCache] caches the [TableName] to [TableId] mapping.
pub type TableNameCache = CacheContainer<TableName, TableId, CacheIdent>;
pub type TableNameCacheRef = Arc<TableNameCache>;
/// Constructs a [TableNameCache].
pub fn new_table_name_cache(
name: String,
cache: Cache<TableName, TableId>,
kv_backend: KvBackendRef,
) -> TableNameCache {
let table_name_manager = Arc::new(TableNameManager::new(kv_backend));
let init = init_factory(table_name_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer<TableName, TableId> {
Arc::new(
move |TableName {
catalog_name,
schema_name,
table_name,
}| {
let table_name_manager = table_name_manager.clone();
Box::pin(async move {
Ok(Some(
table_name_manager
.get(TableNameKey {
catalog: catalog_name,
schema: schema_name,
table: table_name,
})
.await?
.context(error::ValueNotExistSnafu {})?
.table_id(),
))
})
},
)
}
fn invalidator<'a>(
cache: &'a Cache<TableName, TableId>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
if let CacheIdent::TableName(table_name) = ident {
cache.invalidate(table_name).await
}
Ok(())
})
}
fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::TableName(_))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use moka::future::CacheBuilder;
use super::*;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::kv_backend::txn::TxnService;
#[tokio::test]
async fn test_cache_get() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
let result = cache
.get_by_ref(&TableName {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "my_table".to_string(),
})
.await
.unwrap();
assert!(result.is_none());
// Puts a new value.
let table_name_manager = TableNameManager::new(mem_kv.clone());
let table_id = 1024;
let txn = table_name_manager
.build_create_txn(
&TableNameKey {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
table: "my_table",
},
table_id,
)
.unwrap();
mem_kv.txn(txn).await.unwrap();
let got = cache
.get_by_ref(&TableName {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "my_table".to_string(),
})
.await
.unwrap()
.unwrap();
assert_eq!(got, table_id);
}
#[tokio::test]
async fn test_invalidate_cache() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
// Puts a new value.
let table_name_manager = TableNameManager::new(mem_kv.clone());
let table_id = 1024;
let table_name = TableName {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "my_table".to_string(),
};
let txn = table_name_manager
.build_create_txn(
&TableNameKey {
catalog: DEFAULT_CATALOG_NAME,
schema: DEFAULT_SCHEMA_NAME,
table: "my_table",
},
table_id,
)
.unwrap();
mem_kv.txn(txn).await.unwrap();
let got = cache.get_by_ref(&table_name).await.unwrap().unwrap();
assert_eq!(got, table_id);
assert!(cache.contains_key(&table_name));
cache
.invalidate(&[CacheIdent::TableName(table_name.clone())])
.await
.unwrap();
assert!(!cache.contains_key(&table_name));
}
}

View File

@@ -0,0 +1,137 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use store_api::storage::TableId;
use crate::cache::{CacheContainer, Initializer};
use crate::error;
use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::table_route::{TableRouteManager, TableRouteManagerRef, TableRouteValue};
use crate::kv_backend::KvBackendRef;
/// [TableRouteCache] caches the [TableId] to [TableRouteValue] mapping.
pub type TableRouteCache = CacheContainer<TableId, Arc<TableRouteValue>, CacheIdent>;
pub type TableRouteCacheRef = Arc<TableRouteCache>;
/// Constructs a [TableRouteCache].
pub fn new_table_route_cache(
name: String,
cache: Cache<TableId, Arc<TableRouteValue>>,
kv_backend: KvBackendRef,
) -> TableRouteCache {
let table_info_manager = Arc::new(TableRouteManager::new(kv_backend));
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(
table_route_manager: TableRouteManagerRef,
) -> Initializer<TableId, Arc<TableRouteValue>> {
Arc::new(move |table_id| {
let table_route_manager = table_route_manager.clone();
Box::pin(async move {
let table_route_value = table_route_manager
.table_route_storage()
.get(*table_id)
.await?
.context(error::ValueNotExistSnafu {})?;
Ok(Some(Arc::new(table_route_value)))
})
})
}
fn invalidator<'a>(
cache: &'a Cache<TableId, Arc<TableRouteValue>>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
if let CacheIdent::TableId(table_id) = ident {
cache.invalidate(table_id).await
}
Ok(())
})
}
fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::TableId(_))
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use moka::future::CacheBuilder;
use store_api::storage::RegionId;
use super::*;
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::{Region, RegionRoute};
#[tokio::test]
async fn test_cache() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
let cache = CacheBuilder::new(128).build();
let cache = new_table_route_cache("test".to_string(), cache, mem_kv.clone());
let result = cache.get(1024).await.unwrap();
assert!(result.is_none());
let task = test_create_table_task("my_table", 1024);
let table_id = 10;
let region_id = RegionId::new(table_id, 1);
let peer = Peer::empty(1);
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(peer.clone()),
..Default::default()
}];
table_metadata_manager
.create_table_metadata(
task.table_info.clone(),
TableRouteValue::physical(region_routes.clone()),
HashMap::new(),
)
.await
.unwrap();
let table_route = cache.get(1024).await.unwrap().unwrap();
assert_eq!(
(*table_route)
.clone()
.into_physical_table_route()
.region_routes,
region_routes
);
assert!(cache.contains_key(&1024));
cache
.invalidate(&[CacheIdent::TableId(1024)])
.await
.unwrap();
assert!(!cache.contains_key(&1024));
}
}

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
@@ -128,6 +129,7 @@ impl TableInfoValue {
}
}
pub type TableInfoManagerRef = Arc<TableInfoManager>;
pub struct TableInfoManager {
kv_backend: KvBackendRef,
}

View File

@@ -31,7 +31,7 @@ use crate::rpc::store::{BatchGetRequest, RangeRequest};
use crate::rpc::KeyValue;
use crate::table_name::TableName;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub struct TableNameKey<'a> {
pub catalog: &'a str,
pub schema: &'a str,
@@ -155,6 +155,8 @@ impl TableNameValue {
}
}
pub type TableNameManagerRef = Arc<TableNameManager>;
#[derive(Clone)]
pub struct TableNameManager {
kv_backend: KvBackendRef,

View File

@@ -14,6 +14,7 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
@@ -276,6 +277,8 @@ impl Display for TableRouteKey {
}
}
pub type TableRouteManagerRef = Arc<TableRouteManager>;
pub struct TableRouteManager {
storage: TableRouteStorage,
}