feat: support cache for batch_get in CachedMetaKvBackend (#3277)

* feat: support cache for batch_get in CachedMetaKvBackend.

* add doc of CachedMetaKvBackend

* fix: cr

* fix: correct some words

* fix cr
This commit is contained in:
fys
2024-02-06 11:15:03 +08:00
committed by GitHub
parent 4cbdf64d52
commit 74bfb09195
8 changed files with 380 additions and 34 deletions

View File

@@ -69,6 +69,13 @@ timeout = "3s"
ddl_timeout = "10s"
connect_timeout = "1s"
tcp_nodelay = true
# The configuration about the cache of the Metadata.
# default: 100000
metadata_cache_max_capacity = 100000
# default: 10m
metadata_cache_ttl = "10m"
# default: 5m
metadata_cache_tti = "5m"
# Log options, see `standalone.example.toml`
# [logging]

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub use client::{CachedMetaKvBackend, MetaKvBackend};
pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};
mod client;
mod manager;

View File

@@ -14,8 +14,10 @@
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::usize;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
@@ -33,18 +35,91 @@ use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
use snafu::{OptionExt, ResultExt};
use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET};
use crate::metrics::{
METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET,
};
const CACHE_MAX_CAPACITY: u64 = 10000;
const CACHE_TTL_SECOND: u64 = 10 * 60;
const CACHE_TTI_SECOND: u64 = 5 * 60;
const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
pub struct CachedMetaKvBackendBuilder {
cache_max_capacity: Option<u64>,
cache_ttl: Option<Duration>,
cache_tti: Option<Duration>,
meta_client: Arc<MetaClient>,
}
impl CachedMetaKvBackendBuilder {
pub fn new(meta_client: Arc<MetaClient>) -> Self {
Self {
cache_max_capacity: None,
cache_ttl: None,
cache_tti: None,
meta_client,
}
}
pub fn cache_max_capacity(mut self, cache_max_capacity: u64) -> Self {
self.cache_max_capacity.replace(cache_max_capacity);
self
}
pub fn cache_ttl(mut self, cache_ttl: Duration) -> Self {
self.cache_ttl.replace(cache_ttl);
self
}
pub fn cache_tti(mut self, cache_tti: Duration) -> Self {
self.cache_tti.replace(cache_tti);
self
}
pub fn build(self) -> CachedMetaKvBackend {
let cache_max_capacity = self
.cache_max_capacity
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);
let cache = Arc::new(
CacheBuilder::new(cache_max_capacity)
.time_to_live(cache_ttl)
.time_to_idle(cache_tti)
.build(),
);
let kv_backend = Arc::new(MetaKvBackend {
client: self.meta_client,
});
let name = format!("CachedKvBackend({})", kv_backend.name());
let version = AtomicUsize::new(0);
CachedMetaKvBackend {
kv_backend,
cache,
name,
version,
}
}
}
pub type CacheBackendRef = Arc<Cache<Vec<u8>, KeyValue>>;
/// A wrapper of `MetaKvBackend` with cache support.
///
/// CachedMetaKvBackend is mainly used to read metadata information from Metasrv, and provides
/// cache for get and batch_get. One way to trigger cache invalidation of CachedMetaKvBackend:
/// when metadata information changes, Metasrv will broadcast a metadata invalidation request.
///
/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
/// information. Note: If you read other information, you may read expired data, which depends on
/// TTL and TTI for cache.
pub struct CachedMetaKvBackend {
kv_backend: KvBackendRef,
cache: CacheBackendRef,
name: String,
version: AtomicUsize,
}
impl TxnService for CachedMetaKvBackend {
@@ -96,7 +171,38 @@ impl KvBackend for CachedMetaKvBackend {
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
self.kv_backend.batch_get(req).await
let _timer = METRIC_CATALOG_KV_BATCH_GET.start_timer();
let mut kvs = Vec::with_capacity(req.keys.len());
let mut miss_keys = Vec::with_capacity(req.keys.len());
for key in req.keys {
if let Some(val) = self.cache.get(&key).await {
kvs.push(val);
} else {
miss_keys.push(key);
}
}
let batch_get_req = BatchGetRequest::new().with_keys(miss_keys.clone());
let pre_version = self.version();
let unhit_kvs = self.kv_backend.batch_get(batch_get_req).await?.kvs;
for kv in unhit_kvs.iter() {
self.cache.insert(kv.key().to_vec(), kv.clone()).await;
}
if !self.validate_version(pre_version) {
for key in miss_keys.iter() {
self.cache.invalidate(key).await;
}
}
kvs.extend(unhit_kvs);
Ok(BatchGetResponse { kvs })
}
async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
@@ -154,8 +260,14 @@ impl KvBackend for CachedMetaKvBackend {
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
let _timer = METRIC_CATALOG_KV_GET.start_timer();
let pre_version = Arc::new(Mutex::new(None));
let init = async {
let version_clone = pre_version.clone();
let _timer = METRIC_CATALOG_KV_REMOTE_GET.start_timer();
version_clone.lock().unwrap().replace(self.version());
self.kv_backend.get(key).await.map(|val| {
val.with_context(|| CacheNotGetSnafu {
key: String::from_utf8_lossy(key),
@@ -166,7 +278,7 @@ impl KvBackend for CachedMetaKvBackend {
// currently moka doesn't have `optionally_try_get_with_by_ref`
// TODO(fys): change to moka method when available
// https://github.com/moka-rs/moka/issues/254
match self.cache.try_get_with_by_ref(key, init).await {
let ret = match self.cache.try_get_with_by_ref(key, init).await {
Ok(val) => Ok(Some(val)),
Err(e) => match e.as_ref() {
CacheNotGet { .. } => Ok(None),
@@ -175,29 +287,40 @@ impl KvBackend for CachedMetaKvBackend {
}
.map_err(|e| GetKvCache {
err_msg: e.to_string(),
})
});
// "cache.invalidate_key" and "cache.try_get_with_by_ref" are not mutually exclusive. So we need
// to use the version mechanism to prevent expired data from being put into the cache.
if pre_version
.lock()
.unwrap()
.as_ref()
.map_or(false, |v| !self.validate_version(*v))
{
self.cache.invalidate(key).await;
}
ret
}
}
#[async_trait::async_trait]
impl KvCacheInvalidator for CachedMetaKvBackend {
async fn invalidate_key(&self, key: &[u8]) {
self.create_new_version();
self.cache.invalidate(key).await;
debug!("invalidated cache key: {}", String::from_utf8_lossy(key));
}
}
impl CachedMetaKvBackend {
pub fn new(client: Arc<MetaClient>) -> Self {
let kv_backend = Arc::new(MetaKvBackend { client });
Self::wrap(kv_backend)
}
pub fn wrap(kv_backend: KvBackendRef) -> Self {
// only for test
#[cfg(test)]
fn wrap(kv_backend: KvBackendRef) -> Self {
let cache = Arc::new(
CacheBuilder::new(CACHE_MAX_CAPACITY)
.time_to_live(Duration::from_secs(CACHE_TTL_SECOND))
.time_to_idle(Duration::from_secs(CACHE_TTI_SECOND))
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build(),
);
@@ -206,12 +329,25 @@ impl CachedMetaKvBackend {
kv_backend,
cache,
name,
version: AtomicUsize::new(0),
}
}
pub fn cache(&self) -> &CacheBackendRef {
&self.cache
}
fn version(&self) -> usize {
self.version.load(Ordering::Relaxed)
}
fn validate_version(&self, pre_version: usize) -> bool {
self.version() == pre_version
}
fn create_new_version(&self) -> usize {
self.version.fetch_add(1, Ordering::Relaxed) + 1
}
}
#[derive(Debug)]
@@ -308,3 +444,162 @@ impl KvBackend for MetaKvBackend {
self
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_meta::kv_backend::{KvBackend, TxnService};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
BatchPutRequest, BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse,
DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse, RangeRequest,
RangeResponse,
};
use common_meta::rpc::KeyValue;
use dashmap::DashMap;
use super::CachedMetaKvBackend;
#[derive(Default)]
pub struct SimpleKvBackend {
inner_map: DashMap<Vec<u8>, Vec<u8>>,
get_execute_times: Arc<AtomicU32>,
}
impl TxnService for SimpleKvBackend {
type Error = common_meta::error::Error;
}
#[async_trait]
impl KvBackend for SimpleKvBackend {
fn name(&self) -> &str {
"SimpleKvBackend"
}
fn as_any(&self) -> &dyn Any {
self
}
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
let mut kvs = Vec::with_capacity(req.keys.len());
for key in req.keys.iter() {
if let Some(kv) = self.get(key).await? {
kvs.push(kv);
}
}
Ok(BatchGetResponse { kvs })
}
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
self.inner_map.insert(req.key, req.value);
// always return None as prev_kv, since we don't use it in this test.
Ok(PutResponse { prev_kv: None })
}
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
self.get_execute_times
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(self.inner_map.get(key).map(|v| KeyValue {
key: key.to_vec(),
value: v.value().clone(),
}))
}
async fn range(&self, _req: RangeRequest) -> Result<RangeResponse, Self::Error> {
todo!()
}
async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
todo!()
}
async fn compare_and_put(
&self,
_req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
todo!()
}
async fn delete_range(
&self,
_req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error> {
todo!()
}
async fn batch_delete(
&self,
_req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error> {
todo!()
}
}
#[tokio::test]
async fn test_cached_kv_backend() {
let simple_kv = Arc::new(SimpleKvBackend::default());
let get_execute_times = simple_kv.get_execute_times.clone();
let cached_kv = CachedMetaKvBackend::wrap(simple_kv);
add_some_vals(&cached_kv).await;
let batch_get_req = BatchGetRequest {
keys: vec![b"k1".to_vec(), b"k2".to_vec()],
};
assert_eq!(get_execute_times.load(Ordering::SeqCst), 0);
for _ in 0..10 {
let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
assert_eq!(get_execute_times.load(Ordering::SeqCst), 2);
}
let batch_get_req = BatchGetRequest {
keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
};
let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
assert_eq!(get_execute_times.load(Ordering::SeqCst), 3);
for _ in 0..10 {
let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
assert_eq!(get_execute_times.load(Ordering::SeqCst), 3);
}
}
async fn add_some_vals(kv_backend: &impl KvBackend) {
kv_backend
.put(PutRequest {
key: b"k1".to_vec(),
value: b"v1".to_vec(),
prev_kv: false,
})
.await
.unwrap();
kv_backend
.put(PutRequest {
key: b"k2".to_vec(),
value: b"v2".to_vec(),
prev_kv: false,
})
.await
.unwrap();
kv_backend
.put(PutRequest {
key: b"k3".to_vec(),
value: b"v3".to_vec(),
prev_kv: false,
})
.await
.unwrap();
}
}

View File

@@ -32,4 +32,6 @@ lazy_static! {
register_histogram!("greptime_catalog_kv_get_remote", "catalog kv get remote").unwrap();
pub static ref METRIC_CATALOG_KV_GET: Histogram =
register_histogram!("greptime_catalog_kv_get", "catalog kv get").unwrap();
pub static ref METRIC_CATALOG_KV_BATCH_GET: Histogram =
register_histogram!("greptime_catalog_kv_batch_get", "catalog kv batch get").unwrap();
}

View File

@@ -16,7 +16,9 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use catalog::kvbackend::{CachedMetaKvBackend, KvBackendCatalogManager};
use catalog::kvbackend::{
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_base::Plugins;
use common_error::ext::ErrorExt;
@@ -248,7 +250,8 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);
let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let catalog_list =
KvBackendCatalogManager::new(cached_meta_backend.clone(), cached_meta_backend);

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::CachedMetaKvBackend;
use catalog::kvbackend::CachedMetaKvBackendBuilder;
use clap::Parser;
use client::client_manager::DatanodeClients;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
@@ -228,15 +228,27 @@ impl StartCommand {
let meta_client_options = opts.meta_client.as_ref().context(MissingConfigSnafu {
msg: "'meta_client'",
})?;
let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
let cache_ttl = meta_client_options.metadata_cache_ttl;
let cache_tti = meta_client_options.metadata_cache_tti;
let meta_client = FeInstance::create_meta_client(meta_client_options)
.await
.context(StartFrontendSnafu)?;
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
.cache_max_capacity(cache_max_capacity)
.cache_ttl(cache_ttl)
.cache_tti(cache_tti)
.build();
let cached_meta_backend = Arc::new(cached_meta_backend);
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())),
Arc::new(InvalidateTableCacheHandler::new(
cached_meta_backend.clone(),
)),
]);
let heartbeat_task = HeartbeatTask::new(
@@ -246,11 +258,11 @@ impl StartCommand {
);
let mut instance = FrontendBuilder::new(
meta_backend.clone(),
cached_meta_backend.clone(),
Arc::new(DatanodeClients::default()),
meta_client,
)
.with_cache_invalidator(meta_backend)
.with_cache_invalidator(cached_meta_backend)
.with_plugin(plugins.clone())
.with_heartbeat_task(heartbeat_task)
.try_build()

View File

@@ -36,6 +36,14 @@ pub struct MetaClientOptions {
#[serde(with = "humantime_serde")]
pub connect_timeout: Duration,
pub tcp_nodelay: bool,
#[serde(default = "default_metadata_cache_max_capacity")]
pub metadata_cache_max_capacity: u64,
#[serde(default = "default_metadata_cache_ttl")]
#[serde(with = "humantime_serde")]
pub metadata_cache_ttl: Duration,
#[serde(default = "default_metadata_cache_tti")]
#[serde(with = "humantime_serde")]
pub metadata_cache_tti: Duration,
}
fn default_heartbeat_timeout() -> Duration {
@@ -54,6 +62,18 @@ fn default_timeout() -> Duration {
Duration::from_millis(3_000u64)
}
fn default_metadata_cache_max_capacity() -> u64 {
100_000u64
}
fn default_metadata_cache_ttl() -> Duration {
Duration::from_secs(600u64)
}
fn default_metadata_cache_tti() -> Duration {
Duration::from_secs(300u64)
}
impl Default for MetaClientOptions {
fn default() -> Self {
Self {
@@ -63,6 +83,9 @@ impl Default for MetaClientOptions {
ddl_timeout: default_ddl_timeout(),
connect_timeout: default_connect_timeout(),
tcp_nodelay: true,
metadata_cache_max_capacity: default_metadata_cache_max_capacity(),
metadata_cache_ttl: default_metadata_cache_ttl(),
metadata_cache_tti: default_metadata_cache_tti(),
}
}
}

View File

@@ -20,7 +20,7 @@ use std::time::Duration;
use api::v1::meta::Role;
use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer;
use catalog::kvbackend::{CachedMetaKvBackend, MetaKvBackend};
use catalog::kvbackend::{CachedMetaKvBackendBuilder, MetaKvBackend};
use client::client_manager::DatanodeClients;
use client::Client;
use common_base::Plugins;
@@ -350,11 +350,14 @@ impl GreptimeDbClusterBuilder {
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let cached_meta_backend =
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(meta_backend.clone())),
Arc::new(InvalidateTableCacheHandler::new(
cached_meta_backend.clone(),
)),
]);
let heartbeat_task = HeartbeatTask::new(
@@ -363,12 +366,13 @@ impl GreptimeDbClusterBuilder {
Arc::new(handlers_executor),
);
let instance = FrontendBuilder::new(meta_backend.clone(), datanode_clients, meta_client)
.with_cache_invalidator(meta_backend)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.unwrap();
let instance =
FrontendBuilder::new(cached_meta_backend.clone(), datanode_clients, meta_client)
.with_cache_invalidator(cached_meta_backend)
.with_heartbeat_task(heartbeat_task)
.try_build()
.await
.unwrap();
Arc::new(instance)
}