feat: implement CacheContainer & TableFlownodeSetCache (#3885)

* feat: implement the `CacheContainer`

* feat: implement the `TableFlownodeSetCache`

* chore: remove unused feature

* chore: remove unused `Arc`

* refactor: refactor origin `get` to `get_by_ref`

* chore: update comments

* refactor: refactor `CacheContainer`

* chore: move `CacheContainer` to container.rs

* feat: add metrics

* chore: update tests

* test: add tests for value not exists

* test: add test for get

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2024-05-09 18:26:35 +09:00
committed by GitHub
parent ed95e99556
commit b8a325d18c
11 changed files with 592 additions and 3 deletions

1
Cargo.lock generated
View File

@@ -1934,6 +1934,7 @@ dependencies = [
"hyper",
"itertools 0.10.5",
"lazy_static",
"moka",
"prometheus",
"prost 0.12.4",
"rand",

View File

@@ -38,6 +38,7 @@ hex = { version = "0.4" }
humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
moka.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod container;
mod flow;
pub use container::{CacheContainer, Initializer, Invalidator, TokenFilter};
pub use flow::{new_table_flownode_set_cache, TableFlownodeSetCache};

269
src/common/meta/src/cache/container.rs vendored Normal file
View File

@@ -0,0 +1,269 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::borrow::Borrow;
use std::hash::Hash;
use std::sync::Arc;
use futures::future::BoxFuture;
use moka::future::Cache;
use snafu::{OptionExt, ResultExt};
use crate::cache_invalidator::{CacheInvalidator, Context};
use crate::error::{self, Error, Result};
use crate::instruction::CacheIdent;
use crate::metrics;
/// Filters out unused [CacheToken]s
pub type TokenFilter<CacheToken> = Box<dyn Fn(&CacheToken) -> bool + Send + Sync>;
/// Invalidates cached values by [CacheToken]s.
pub type Invalidator<K, V, CacheToken> =
Box<dyn for<'a> Fn(&'a Cache<K, V>, &'a CacheToken) -> BoxFuture<'a, Result<()>> + Send + Sync>;
/// Initializes value (i.e., fetches from remote).
pub type Initializer<K, V> = Arc<dyn Fn(&'_ K) -> BoxFuture<'_, Result<Option<V>>> + Send + Sync>;
/// [CacheContainer] provides ability to:
/// - Cache value loaded by [Initializer].
/// - Invalidate caches by [Invalidator].
pub struct CacheContainer<K, V, CacheToken> {
name: String,
cache: Cache<K, V>,
invalidator: Invalidator<K, V, CacheToken>,
initializer: Initializer<K, V>,
token_filter: TokenFilter<CacheToken>,
}
impl<K, V, CacheToken> CacheContainer<K, V, CacheToken>
where
K: Send + Sync,
V: Send + Sync,
CacheToken: Send + Sync,
{
/// Constructs an [CacheContainer].
pub fn new(
name: String,
cache: Cache<K, V>,
invalidator: Invalidator<K, V, CacheToken>,
initializer: Initializer<K, V>,
token_filter: TokenFilter<CacheToken>,
) -> Self {
Self {
name,
cache,
invalidator,
initializer,
token_filter,
}
}
}
#[async_trait::async_trait]
impl<K, V> CacheInvalidator for CacheContainer<K, V, CacheIdent>
where
K: Send + Sync,
V: Send + Sync,
{
async fn invalidate(&self, _ctx: &Context, caches: Vec<CacheIdent>) -> Result<()> {
for token in caches
.into_iter()
.filter(|token| (self.token_filter)(token))
{
(self.invalidator)(&self.cache, &token).await?;
}
Ok(())
}
}
impl<K, V, CacheToken> CacheContainer<K, V, CacheToken>
where
K: Copy + Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
/// Returns a _clone_ of the value corresponding to the key.
pub async fn get(&self, key: K) -> Result<Option<V>> {
let moved_init = self.initializer.clone();
let moved_key = key;
let init = async move {
moved_init(&moved_key)
.await
.transpose()
.context(error::ValueNotExistSnafu)?
};
match self.cache.try_get_with(key, init).await {
Ok(value) => Ok(Some(value)),
Err(err) => match err.as_ref() {
Error::ValueNotExist { .. } => Ok(None),
_ => Err(err).context(error::GetCacheSnafu),
},
}
}
}
impl<K, V, CacheToken> CacheContainer<K, V, CacheToken>
where
K: Hash + Eq + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
/// Invalidates cache by [CacheToken].
pub async fn invalidate(&self, caches: &[CacheToken]) -> Result<()> {
for token in caches.iter().filter(|token| (self.token_filter)(token)) {
(self.invalidator)(&self.cache, token).await?;
}
Ok(())
}
/// Returns a _clone_ of the value corresponding to the key.
pub async fn get_by_ref<Q>(&self, key: &Q) -> Result<Option<V>>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Hash + Eq + ?Sized,
{
metrics::CACHE_CONTAINER_CACHE_GET
.with_label_values(&[&self.name])
.inc();
let moved_init = self.initializer.clone();
let moved_key = key.to_owned();
let init = async move {
metrics::CACHE_CONTAINER_CACHE_MISS
.with_label_values(&[&self.name])
.inc();
moved_init(&moved_key)
.await
.transpose()
.context(error::ValueNotExistSnafu)?
};
match self.cache.try_get_with_by_ref(key, init).await {
Ok(value) => Ok(Some(value)),
Err(err) => match err.as_ref() {
Error::ValueNotExist { .. } => Ok(None),
_ => Err(err).context(error::GetCacheSnafu),
},
}
}
}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::Arc;
use moka::future::{Cache, CacheBuilder};
use super::*;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
struct NameKey<'a> {
name: &'a str,
}
#[tokio::test]
async fn test_get() {
let cache: Cache<NameKey, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<NameKey, String> = Arc::new(move |_| {
moved_counter.fetch_add(1, Ordering::Relaxed);
Box::pin(async { Ok(Some("hi".to_string())) })
});
let invalidator: Invalidator<NameKey, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let key = NameKey { name: "key" };
let value = adv_cache.get(key).await.unwrap().unwrap();
assert_eq!(value, "hi");
assert_eq!(counter.load(Ordering::Relaxed), 1);
let key = NameKey { name: "key" };
let value = adv_cache.get(key).await.unwrap().unwrap();
assert_eq!(value, "hi");
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn test_get_by_ref() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
moved_counter.fetch_add(1, Ordering::Relaxed);
Box::pin(async { Ok(Some("hi".to_string())) })
});
let invalidator: Invalidator<String, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
assert_eq!(counter.load(Ordering::Relaxed), 1);
let value = adv_cache.get_by_ref("bar").await.unwrap().unwrap();
assert_eq!(value, "hi");
assert_eq!(counter.load(Ordering::Relaxed), 2);
}
#[tokio::test]
async fn test_get_value_not_exits() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let init: Initializer<String, String> =
Arc::new(move |_| Box::pin(async { error::ValueNotExistSnafu {}.fail() }));
let invalidator: Invalidator<String, String, String> =
Box::new(|_, _| Box::pin(async { Ok(()) }));
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let value = adv_cache.get_by_ref("foo").await.unwrap();
assert!(value.is_none());
}
#[tokio::test]
async fn test_invalidate() {
let cache: Cache<String, String> = CacheBuilder::new(128).build();
let filter: TokenFilter<String> = Box::new(|_| true);
let counter = Arc::new(AtomicI32::new(0));
let moved_counter = counter.clone();
let init: Initializer<String, String> = Arc::new(move |_| {
moved_counter.fetch_add(1, Ordering::Relaxed);
Box::pin(async { Ok(Some("hi".to_string())) })
});
let invalidator: Invalidator<String, String, String> = Box::new(|cache, key| {
Box::pin(async move {
cache.invalidate(key).await;
Ok(())
})
});
let adv_cache = CacheContainer::new("test".to_string(), cache, invalidator, init, filter);
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
assert_eq!(counter.load(Ordering::Relaxed), 1);
adv_cache
.invalidate(&["foo".to_string(), "bar".to_string()])
.await
.unwrap();
let value = adv_cache.get_by_ref("foo").await.unwrap().unwrap();
assert_eq!(value, "hi");
assert_eq!(counter.load(Ordering::Relaxed), 2);
}
}

16
src/common/meta/src/cache/flow.rs vendored Normal file
View File

@@ -0,0 +1,16 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod table_flownode;
pub use table_flownode::{new_table_flownode_set_cache, TableFlownodeSetCache};

View File

@@ -0,0 +1,241 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use futures::future::BoxFuture;
use futures::TryStreamExt;
use moka::future::Cache;
use moka::ops::compute::Op;
use table::metadata::TableId;
use crate::cache::{CacheContainer, Initializer};
use crate::error::Result;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::FlownodeId;
type FlownodeSet = HashSet<FlownodeId>;
/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent>;
/// Constructs a [TableFlownodeSetCache].
pub fn new_table_flownode_set_cache(
name: String,
cache: Cache<TableId, FlownodeSet>,
kv_backend: KvBackendRef,
) -> TableFlownodeSetCache {
let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
let init = init_factory(table_flow_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
Arc::new(move |&table_id| {
let table_flow_manager = table_flow_manager.clone();
Box::pin(async move {
table_flow_manager
.flows(table_id)
.map_ok(|key| key.flownode_id())
.try_collect::<HashSet<_>>()
.await
.map(Some)
})
})
}
async fn handle_create_flow(
cache: &Cache<TableId, FlownodeSet>,
CreateFlow {
source_table_ids,
flownode_ids,
}: &CreateFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, HashSet<u64>>>| match entry {
Some(entry) => {
let mut set = entry.into_value();
set.extend(flownode_ids.iter().cloned());
Op::Put(set)
}
None => Op::Put(HashSet::from_iter(flownode_ids.iter().cloned())),
},
)
.await;
}
}
async fn handle_drop_flow(
cache: &Cache<TableId, FlownodeSet>,
DropFlow {
source_table_ids,
flownode_ids,
}: &DropFlow,
) {
for table_id in source_table_ids {
let entry = cache.entry(*table_id);
entry
.and_compute_with(
async |entry: Option<moka::Entry<u32, HashSet<u64>>>| match entry {
Some(entry) => {
let mut set = entry.into_value();
for flownode_id in flownode_ids {
set.remove(flownode_id);
}
Op::Put(set)
}
None => {
// Do nothing
Op::Nop
}
},
)
.await;
}
}
fn invalidator<'a>(
cache: &'a Cache<TableId, FlownodeSet>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, Result<()>> {
Box::pin(async move {
match ident {
CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
_ => {}
}
Ok(())
})
}
fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use moka::future::CacheBuilder;
use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
use crate::key::flow::flow_info::FlowInfoValue;
use crate::key::flow::FlowMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::table_name::TableName;
#[tokio::test]
async fn test_cache_empty_set() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let set = cache.get(1024).await.unwrap().unwrap();
assert!(set.is_empty());
}
#[tokio::test]
async fn test_get() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
flownode_metadata_manager
.create_flow_metadata(
1024,
FlowInfoValue {
source_table_ids: vec![1024, 1025],
sink_table_name: TableName {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "sink_table".to_string(),
},
flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
flow_name: "my_flow".to_string(),
raw_sql: "sql".to_string(),
expire_when: "expire".to_string(),
comment: "comment".to_string(),
options: Default::default(),
},
)
.await
.unwrap();
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([1, 2, 3]));
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([1, 2, 3]));
let result = cache.get(1026).await.unwrap().unwrap();
assert_eq!(result.len(), 0);
}
#[tokio::test]
async fn test_create_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set.len(), 5);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set.len(), 5);
}
#[tokio::test]
async fn test_drop_flow() {
let mem_kv = Arc::new(MemoryKvBackend::default());
let cache = CacheBuilder::new(128).build();
let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
let ident = vec![
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
}),
CacheIdent::CreateFlow(CreateFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![11, 12],
}),
];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set.len(), 7);
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set.len(), 7);
let ident = vec![CacheIdent::DropFlow(DropFlow {
source_table_ids: vec![1024, 1025],
flownode_ids: vec![1, 2, 3, 4, 5],
})];
cache.invalidate(&ident).await.unwrap();
let set = cache.get(1024).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([11, 12]));
let set = cache.get(1025).await.unwrap().unwrap();
assert_eq!(set, HashSet::from([11, 12]));
}
}

View File

@@ -112,6 +112,10 @@ where
let key: SchemaNameKey = (&schema_name).into();
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
// TODO(weny): implements it
unimplemented!()
}
}
}
Ok(())

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::str::Utf8Error;
use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
@@ -453,6 +454,12 @@ pub enum Error {
error: std::string::FromUtf8Error,
location: Location,
},
#[snafu(display("Value not exists"))]
ValueNotExist { location: Location },
#[snafu(display("Failed to get cache"))]
GetCache { source: Arc<Error> },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -466,7 +473,9 @@ impl ErrorExt for Error {
| EtcdFailed { .. }
| EtcdTxnFailed { .. }
| ConnectEtcd { .. }
| MoveValues { .. } => StatusCode::Internal,
| MoveValues { .. }
| ValueNotExist { .. }
| GetCache { .. } => StatusCode::Internal,
SerdeJson { .. }
| ParseOption { .. }

View File

@@ -23,7 +23,7 @@ use table::metadata::TableId;
use crate::key::schema_name::SchemaName;
use crate::table_name::TableName;
use crate::{ClusterId, DatanodeId};
use crate::{ClusterId, DatanodeId, FlownodeId};
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct RegionIdent {
@@ -152,12 +152,26 @@ pub struct UpgradeRegion {
pub wait_for_replay_timeout: Option<Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
/// The identifier of cache.
pub enum CacheIdent {
TableId(TableId),
TableName(TableName),
SchemaName(SchemaName),
CreateFlow(CreateFlow),
DropFlow(DropFlow),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CreateFlow {
pub source_table_ids: Vec<TableId>,
pub flownode_ids: Vec<FlownodeId>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DropFlow {
pub source_table_ids: Vec<TableId>,
pub flownode_ids: Vec<FlownodeId>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]

View File

@@ -19,6 +19,7 @@
#![feature(extract_if)]
#![feature(hash_extract_if)]
pub mod cache;
pub mod cache_invalidator;
pub mod cluster;
pub mod ddl;

View File

@@ -75,4 +75,18 @@ lazy_static! {
&["step"]
)
.unwrap();
/// Cache container cache get counter.
pub static ref CACHE_CONTAINER_CACHE_GET: IntCounterVec = register_int_counter_vec!(
"greptime_meta_cache_container_cache_get",
"cache container cache get",
&["name"]
)
.unwrap();
/// Cache container cache miss counter.
pub static ref CACHE_CONTAINER_CACHE_MISS: IntCounterVec = register_int_counter_vec!(
"greptime_meta_cache_container_cache_miss",
"cache container cache miss",
&["name"]
)
.unwrap();
}