diff --git a/config/frontend.example.toml b/config/frontend.example.toml index ed4809ca84..29219fe4b8 100644 --- a/config/frontend.example.toml +++ b/config/frontend.example.toml @@ -69,6 +69,13 @@ timeout = "3s" ddl_timeout = "10s" connect_timeout = "1s" tcp_nodelay = true +# The configuration about the cache of the Metadata. +# default: 100000 +metadata_cache_max_capacity = 100000 +# default: 10m +metadata_cache_ttl = "10m" +# default: 5m +metadata_cache_tti = "5m" # Log options, see `standalone.example.toml` # [logging] diff --git a/src/catalog/src/kvbackend.rs b/src/catalog/src/kvbackend.rs index e26e06e80e..6a1aa9e2ac 100644 --- a/src/catalog/src/kvbackend.rs +++ b/src/catalog/src/kvbackend.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use client::{CachedMetaKvBackend, MetaKvBackend}; +pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend}; mod client; mod manager; diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index 5ab34072e9..c0f3547609 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -14,8 +14,10 @@ use std::any::Any; use std::fmt::Debug; -use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use std::time::Duration; +use std::usize; use common_error::ext::BoxedError; use common_meta::cache_invalidator::KvCacheInvalidator; @@ -33,18 +35,91 @@ use meta_client::client::MetaClient; use moka::future::{Cache, CacheBuilder}; use snafu::{OptionExt, ResultExt}; -use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET}; +use crate::metrics::{ + METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET, +}; -const CACHE_MAX_CAPACITY: u64 = 10000; -const CACHE_TTL_SECOND: u64 = 10 * 60; -const CACHE_TTI_SECOND: u64 = 5 * 60; +const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000; +const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60); +const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60); + +pub struct CachedMetaKvBackendBuilder { + cache_max_capacity: Option, + cache_ttl: Option, + cache_tti: Option, + meta_client: Arc, +} + +impl CachedMetaKvBackendBuilder { + pub fn new(meta_client: Arc) -> Self { + Self { + cache_max_capacity: None, + cache_ttl: None, + cache_tti: None, + meta_client, + } + } + + pub fn cache_max_capacity(mut self, cache_max_capacity: u64) -> Self { + self.cache_max_capacity.replace(cache_max_capacity); + self + } + + pub fn cache_ttl(mut self, cache_ttl: Duration) -> Self { + self.cache_ttl.replace(cache_ttl); + self + } + + pub fn cache_tti(mut self, cache_tti: Duration) -> Self { + self.cache_tti.replace(cache_tti); + self + } + + pub fn build(self) -> CachedMetaKvBackend { + let cache_max_capacity = self + .cache_max_capacity + .unwrap_or(DEFAULT_CACHE_MAX_CAPACITY); + let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL); + let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI); + + let cache = Arc::new( + CacheBuilder::new(cache_max_capacity) + .time_to_live(cache_ttl) + .time_to_idle(cache_tti) + .build(), + ); + + let kv_backend = Arc::new(MetaKvBackend { + client: self.meta_client, + }); + let name = format!("CachedKvBackend({})", kv_backend.name()); + let version = AtomicUsize::new(0); + + CachedMetaKvBackend { + kv_backend, + cache, + name, + version, + } + } +} pub type CacheBackendRef = Arc, KeyValue>>; +/// A wrapper of `MetaKvBackend` with cache support. +/// +/// CachedMetaKvBackend is mainly used to read metadata information from Metasrv, and provides +/// cache for get and batch_get. One way to trigger cache invalidation of CachedMetaKvBackend: +/// when metadata information changes, Metasrv will broadcast a metadata invalidation request. +/// +/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related +/// information. Note: If you read other information, you may read expired data, which depends on +/// TTL and TTI for cache. pub struct CachedMetaKvBackend { kv_backend: KvBackendRef, cache: CacheBackendRef, name: String, + version: AtomicUsize, } impl TxnService for CachedMetaKvBackend { @@ -96,7 +171,38 @@ impl KvBackend for CachedMetaKvBackend { } async fn batch_get(&self, req: BatchGetRequest) -> Result { - self.kv_backend.batch_get(req).await + let _timer = METRIC_CATALOG_KV_BATCH_GET.start_timer(); + + let mut kvs = Vec::with_capacity(req.keys.len()); + let mut miss_keys = Vec::with_capacity(req.keys.len()); + + for key in req.keys { + if let Some(val) = self.cache.get(&key).await { + kvs.push(val); + } else { + miss_keys.push(key); + } + } + + let batch_get_req = BatchGetRequest::new().with_keys(miss_keys.clone()); + + let pre_version = self.version(); + + let unhit_kvs = self.kv_backend.batch_get(batch_get_req).await?.kvs; + + for kv in unhit_kvs.iter() { + self.cache.insert(kv.key().to_vec(), kv.clone()).await; + } + + if !self.validate_version(pre_version) { + for key in miss_keys.iter() { + self.cache.invalidate(key).await; + } + } + + kvs.extend(unhit_kvs); + + Ok(BatchGetResponse { kvs }) } async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result { @@ -154,8 +260,14 @@ impl KvBackend for CachedMetaKvBackend { async fn get(&self, key: &[u8]) -> Result> { let _timer = METRIC_CATALOG_KV_GET.start_timer(); + let pre_version = Arc::new(Mutex::new(None)); + let init = async { + let version_clone = pre_version.clone(); let _timer = METRIC_CATALOG_KV_REMOTE_GET.start_timer(); + + version_clone.lock().unwrap().replace(self.version()); + self.kv_backend.get(key).await.map(|val| { val.with_context(|| CacheNotGetSnafu { key: String::from_utf8_lossy(key), @@ -166,7 +278,7 @@ impl KvBackend for CachedMetaKvBackend { // currently moka doesn't have `optionally_try_get_with_by_ref` // TODO(fys): change to moka method when available // https://github.com/moka-rs/moka/issues/254 - match self.cache.try_get_with_by_ref(key, init).await { + let ret = match self.cache.try_get_with_by_ref(key, init).await { Ok(val) => Ok(Some(val)), Err(e) => match e.as_ref() { CacheNotGet { .. } => Ok(None), @@ -175,29 +287,40 @@ impl KvBackend for CachedMetaKvBackend { } .map_err(|e| GetKvCache { err_msg: e.to_string(), - }) + }); + + // "cache.invalidate_key" and "cache.try_get_with_by_ref" are not mutually exclusive. So we need + // to use the version mechanism to prevent expired data from being put into the cache. + if pre_version + .lock() + .unwrap() + .as_ref() + .map_or(false, |v| !self.validate_version(*v)) + { + self.cache.invalidate(key).await; + } + + ret } } #[async_trait::async_trait] impl KvCacheInvalidator for CachedMetaKvBackend { async fn invalidate_key(&self, key: &[u8]) { + self.create_new_version(); self.cache.invalidate(key).await; debug!("invalidated cache key: {}", String::from_utf8_lossy(key)); } } impl CachedMetaKvBackend { - pub fn new(client: Arc) -> Self { - let kv_backend = Arc::new(MetaKvBackend { client }); - Self::wrap(kv_backend) - } - - pub fn wrap(kv_backend: KvBackendRef) -> Self { + // only for test + #[cfg(test)] + fn wrap(kv_backend: KvBackendRef) -> Self { let cache = Arc::new( - CacheBuilder::new(CACHE_MAX_CAPACITY) - .time_to_live(Duration::from_secs(CACHE_TTL_SECOND)) - .time_to_idle(Duration::from_secs(CACHE_TTI_SECOND)) + CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY) + .time_to_live(DEFAULT_CACHE_TTL) + .time_to_idle(DEFAULT_CACHE_TTI) .build(), ); @@ -206,12 +329,25 @@ impl CachedMetaKvBackend { kv_backend, cache, name, + version: AtomicUsize::new(0), } } pub fn cache(&self) -> &CacheBackendRef { &self.cache } + + fn version(&self) -> usize { + self.version.load(Ordering::Relaxed) + } + + fn validate_version(&self, pre_version: usize) -> bool { + self.version() == pre_version + } + + fn create_new_version(&self) -> usize { + self.version.fetch_add(1, Ordering::Relaxed) + 1 + } } #[derive(Debug)] @@ -308,3 +444,162 @@ impl KvBackend for MetaKvBackend { self } } + +#[cfg(test)] +mod tests { + use std::any::Any; + use std::sync::atomic::{AtomicU32, Ordering}; + use std::sync::Arc; + + use async_trait::async_trait; + use common_meta::kv_backend::{KvBackend, TxnService}; + use common_meta::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, + BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, + DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, + RangeResponse, + }; + use common_meta::rpc::KeyValue; + use dashmap::DashMap; + + use super::CachedMetaKvBackend; + + #[derive(Default)] + pub struct SimpleKvBackend { + inner_map: DashMap, Vec>, + get_execute_times: Arc, + } + + impl TxnService for SimpleKvBackend { + type Error = common_meta::error::Error; + } + + #[async_trait] + impl KvBackend for SimpleKvBackend { + fn name(&self) -> &str { + "SimpleKvBackend" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn batch_get(&self, req: BatchGetRequest) -> Result { + let mut kvs = Vec::with_capacity(req.keys.len()); + for key in req.keys.iter() { + if let Some(kv) = self.get(key).await? { + kvs.push(kv); + } + } + Ok(BatchGetResponse { kvs }) + } + + async fn put(&self, req: PutRequest) -> Result { + self.inner_map.insert(req.key, req.value); + // always return None as prev_kv, since we don't use it in this test. + Ok(PutResponse { prev_kv: None }) + } + + async fn get(&self, key: &[u8]) -> Result, Self::Error> { + self.get_execute_times + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Ok(self.inner_map.get(key).map(|v| KeyValue { + key: key.to_vec(), + value: v.value().clone(), + })) + } + + async fn range(&self, _req: RangeRequest) -> Result { + todo!() + } + + async fn batch_put(&self, _req: BatchPutRequest) -> Result { + todo!() + } + + async fn compare_and_put( + &self, + _req: CompareAndPutRequest, + ) -> Result { + todo!() + } + + async fn delete_range( + &self, + _req: DeleteRangeRequest, + ) -> Result { + todo!() + } + + async fn batch_delete( + &self, + _req: BatchDeleteRequest, + ) -> Result { + todo!() + } + } + + #[tokio::test] + async fn test_cached_kv_backend() { + let simple_kv = Arc::new(SimpleKvBackend::default()); + let get_execute_times = simple_kv.get_execute_times.clone(); + let cached_kv = CachedMetaKvBackend::wrap(simple_kv); + + add_some_vals(&cached_kv).await; + + let batch_get_req = BatchGetRequest { + keys: vec![b"k1".to_vec(), b"k2".to_vec()], + }; + + assert_eq!(get_execute_times.load(Ordering::SeqCst), 0); + + for _ in 0..10 { + let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap(); + + assert_eq!(get_execute_times.load(Ordering::SeqCst), 2); + } + + let batch_get_req = BatchGetRequest { + keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()], + }; + + let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap(); + + assert_eq!(get_execute_times.load(Ordering::SeqCst), 3); + + for _ in 0..10 { + let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap(); + + assert_eq!(get_execute_times.load(Ordering::SeqCst), 3); + } + } + + async fn add_some_vals(kv_backend: &impl KvBackend) { + kv_backend + .put(PutRequest { + key: b"k1".to_vec(), + value: b"v1".to_vec(), + prev_kv: false, + }) + .await + .unwrap(); + + kv_backend + .put(PutRequest { + key: b"k2".to_vec(), + value: b"v2".to_vec(), + prev_kv: false, + }) + .await + .unwrap(); + + kv_backend + .put(PutRequest { + key: b"k3".to_vec(), + value: b"v3".to_vec(), + prev_kv: false, + }) + .await + .unwrap(); + } +} diff --git a/src/catalog/src/metrics.rs b/src/catalog/src/metrics.rs index 8039ae13f8..77063d64ba 100644 --- a/src/catalog/src/metrics.rs +++ b/src/catalog/src/metrics.rs @@ -32,4 +32,6 @@ lazy_static! { register_histogram!("greptime_catalog_kv_get_remote", "catalog kv get remote").unwrap(); pub static ref METRIC_CATALOG_KV_GET: Histogram = register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap(); + pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram = + register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap(); } diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index c8f5ae8418..ccc9d0a272 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -16,7 +16,9 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; -use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager}; +use catalog::kvbackend::{ + CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, +}; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_error::ext::ErrorExt; @@ -248,7 +250,8 @@ async fn create_query_engine(meta_addr: &str) -> Result { .context(StartMetaClientSnafu)?; let meta_client = Arc::new(meta_client); - let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); + let cached_meta_backend = + Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); let catalog_list = KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend); diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index cf710c8c05..149103f10f 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; -use catalog::kvbackend::CachedMetaKvBackend; +use catalog::kvbackend::CachedMetaKvBackendBuilder; use clap::Parser; use client::client_manager::DatanodeClients; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; @@ -228,15 +228,27 @@ impl StartCommand { let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu { msg: "'meta_client'", })?; + + let cache_max_capacity = meta_client_options.metadata_cache_max_capacity; + let cache_ttl = meta_client_options.metadata_cache_ttl; + let cache_tti = meta_client_options.metadata_cache_tti; + let meta_client = FeInstance::create_meta_client(meta_client_options) .await .context(StartFrontendSnafu)?; - let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); + let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone()) + .cache_max_capacity(cache_max_capacity) + .cache_ttl(cache_ttl) + .cache_tti(cache_tti) + .build(); + let cached_meta_backend = Arc::new(cached_meta_backend); let executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())), + Arc::new(InvalidateTableCacheHandler::new( + cached_meta_backend.clone(), + )), ]); let heartbeat_task = HeartbeatTask::new( @@ -246,11 +258,11 @@ impl StartCommand { ); let mut instance = FrontendBuilder::new( - meta_backend.clone(), + cached_meta_backend.clone(), Arc::new(DatanodeClients::default()), meta_client, ) - .with_cache_invalidator(meta_backend) + .with_cache_invalidator(cached_meta_backend) .with_plugin(plugins.clone()) .with_heartbeat_task(heartbeat_task) .try_build() diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 29be282bc5..f7329b4fb6 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -36,6 +36,14 @@ pub struct MetaClientOptions { #[serde(with = "humantime_serde")] pub connect_timeout: Duration, pub tcp_nodelay: bool, + #[serde(default = "default_metadata_cache_max_capacity")] + pub metadata_cache_max_capacity: u64, + #[serde(default = "default_metadata_cache_ttl")] + #[serde(with = "humantime_serde")] + pub metadata_cache_ttl: Duration, + #[serde(default = "default_metadata_cache_tti")] + #[serde(with = "humantime_serde")] + pub metadata_cache_tti: Duration, } fn default_heartbeat_timeout() -> Duration { @@ -54,6 +62,18 @@ fn default_timeout() -> Duration { Duration::from_millis(3_000u64) } +fn default_metadata_cache_max_capacity() -> u64 { + 100_000u64 +} + +fn default_metadata_cache_ttl() -> Duration { + Duration::from_secs(600u64) +} + +fn default_metadata_cache_tti() -> Duration { + Duration::from_secs(300u64) +} + impl Default for MetaClientOptions { fn default() -> Self { Self { @@ -63,6 +83,9 @@ impl Default for MetaClientOptions { ddl_timeout: default_ddl_timeout(), connect_timeout: default_connect_timeout(), tcp_nodelay: true, + metadata_cache_max_capacity: default_metadata_cache_max_capacity(), + metadata_cache_ttl: default_metadata_cache_ttl(), + metadata_cache_tti: default_metadata_cache_tti(), } } } diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 362a8c5e8b..cace9345ca 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -20,7 +20,7 @@ use std::time::Duration; use api::v1::meta::Role; use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; -use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend}; +use catalog::kvbackend::{CachedMetaKvBackendBuilder, MetaKvBackend}; use client::client_manager::DatanodeClients; use client::Client; use common_base::Plugins; @@ -350,11 +350,14 @@ impl GreptimeDbClusterBuilder { meta_client.start(&[&meta_srv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); - let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); + let cached_meta_backend = + Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), - Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())), + Arc::new(InvalidateTableCacheHandler::new( + cached_meta_backend.clone(), + )), ]); let heartbeat_task = HeartbeatTask::new( @@ -363,12 +366,13 @@ impl GreptimeDbClusterBuilder { Arc::new(handlers_executor), ); - let instance = FrontendBuilder::new(meta_backend.clone(), datanode_clients, meta_client) - .with_cache_invalidator(meta_backend) - .with_heartbeat_task(heartbeat_task) - .try_build() - .await - .unwrap(); + let instance = + FrontendBuilder::new(cached_meta_backend.clone(), datanode_clients, meta_client) + .with_cache_invalidator(cached_meta_backend) + .with_heartbeat_task(heartbeat_task) + .try_build() + .await + .unwrap(); Arc::new(instance) }