feat: add cache for catalog kv backend (#1592)

* feat: add kvbackend cache

* fix: cargo fmt
This commit is contained in:
fys
2023-05-24 15:07:29 +08:00
committed by GitHub
parent ddca0307d1
commit fa4a497d75
14 changed files with 323 additions and 39 deletions

62
Cargo.lock generated
View File

@@ -1204,6 +1204,7 @@ dependencies = [
"meta-client",
"metrics",
"mito",
"moka 0.11.0",
"object-store",
"parking_lot",
"regex",
@@ -3113,7 +3114,7 @@ dependencies = [
"meter-core",
"meter-macros",
"mito",
"moka",
"moka 0.9.7",
"object-store",
"openmetrics-parser",
"partition",
@@ -4775,6 +4776,15 @@ dependencies = [
"libc",
]
[[package]]
name = "mach2"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8"
dependencies = [
"libc",
]
[[package]]
name = "maplit"
version = "1.0.2"
@@ -4985,7 +4995,7 @@ dependencies = [
"metrics-util",
"parking_lot",
"portable-atomic",
"quanta",
"quanta 0.10.1",
"thiserror",
]
@@ -5016,7 +5026,7 @@ dependencies = [
"ordered-float 2.10.0",
"parking_lot",
"portable-atomic",
"quanta",
"quanta 0.10.1",
"radix_trie",
"sketches-ddsketch",
]
@@ -5115,7 +5125,33 @@ dependencies = [
"num_cpus",
"once_cell",
"parking_lot",
"quanta",
"quanta 0.10.1",
"rustc_version 0.4.0",
"scheduled-thread-pool",
"skeptic",
"smallvec",
"tagptr",
"thiserror",
"triomphe",
"uuid",
]
[[package]]
name = "moka"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "934030d03f6191edbb4ba16835ccdb80d560788ac686570a8e2986a0fb59ded8"
dependencies = [
"async-io",
"async-lock",
"crossbeam-channel",
"crossbeam-epoch",
"crossbeam-utils",
"futures-util",
"num_cpus",
"once_cell",
"parking_lot",
"quanta 0.11.0",
"rustc_version 0.4.0",
"scheduled-thread-pool",
"skeptic",
@@ -5847,7 +5883,7 @@ dependencies = [
"datafusion-expr",
"datatypes",
"meta-client",
"moka",
"moka 0.9.7",
"serde",
"serde_json",
"snafu",
@@ -6578,6 +6614,22 @@ dependencies = [
"winapi",
]
[[package]]
name = "quanta"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cc73c42f9314c4bdce450c77e6f09ecbddefbeddb1b5979ded332a3913ded33"
dependencies = [
"crossbeam-utils",
"libc",
"mach2",
"once_cell",
"raw-cpuid",
"wasi 0.11.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "query"
version = "0.2.0"

View File

@@ -29,6 +29,7 @@ key-lock = "0.1"
lazy_static = "1.4"
meta-client = { path = "../meta-client" }
metrics.workspace = true
moka = { version = "0.11", features = ["future"] }
parking_lot = "0.12"
regex = "1.6"
serde = "1.0"

View File

@@ -233,6 +233,9 @@ pub enum Error {
#[snafu(backtrace)]
source: table::error::Error,
},
#[snafu(display("A generic error has occurred, msg: {}", msg))]
Generic { msg: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -253,7 +256,7 @@ impl ErrorExt for Error {
| Error::EmptyValue { .. }
| Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable,
Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,
Error::Generic { .. } | Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,
Error::ReadSystemCatalog { source, .. } | Error::CreateRecordBatch { source } => {
source.status_code()

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(trait_upcasting)]
#![feature(assert_matches)]
use std::any::Any;

View File

@@ -20,6 +20,9 @@ pub(crate) const METRIC_CATALOG_MANAGER_CATALOG_COUNT: &str = "catalog.catalog_c
pub(crate) const METRIC_CATALOG_MANAGER_SCHEMA_COUNT: &str = "catalog.schema_count";
pub(crate) const METRIC_CATALOG_MANAGER_TABLE_COUNT: &str = "catalog.table_count";
pub(crate) const METRIC_CATALOG_KV_REMOTE_GET: &str = "catalog.kv.get.remote";
pub(crate) const METRIC_CATALOG_KV_GET: &str = "catalog.kv.get";
#[inline]
pub(crate) fn db_label(catalog: &str, schema: &str) -> (&'static str, String) {
(METRIC_DB_LABEL, build_db_string(catalog, schema))

View File

@@ -16,7 +16,7 @@ use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
pub use client::MetaKvBackend;
pub use client::CachedMetaKvBackend;
use futures::Stream;
use futures_util::StreamExt;
pub use manager::{RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider};
@@ -74,6 +74,13 @@ pub trait KvBackend: Send + Sync {
pub type KvBackendRef = Arc<dyn KvBackend>;
#[async_trait::async_trait]
pub trait KvCacheInvalidator: Send + Sync {
async fn invalidate_key(&self, key: &[u8]);
}
pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
#[cfg(test)]
mod tests {
use async_stream::stream;
@@ -119,12 +126,16 @@ mod tests {
#[tokio::test]
async fn test_get() {
let backend = MockKvBackend {};
let result = backend.get(0.to_string().as_bytes()).await;
assert_eq!(0.to_string().as_bytes(), result.unwrap().unwrap().0);
let result = backend.get(1.to_string().as_bytes()).await;
assert_eq!(1.to_string().as_bytes(), result.unwrap().unwrap().0);
let result = backend.get(2.to_string().as_bytes()).await;
assert_eq!(2.to_string().as_bytes(), result.unwrap().unwrap().0);
let result = backend.get(3.to_string().as_bytes()).await;
assert!(result.unwrap().is_none());
}

View File

@@ -14,15 +14,124 @@
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use async_stream::stream;
use common_meta::rpc::store::{CompareAndPutRequest, DeleteRangeRequest, PutRequest, RangeRequest};
use common_telemetry::info;
use common_telemetry::{info, timer};
use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
use snafu::ResultExt;
use crate::error::{Error, MetaSrvSnafu};
use crate::remote::{Kv, KvBackend, ValueIter};
use super::KvCacheInvalidator;
use crate::error::{Error, GenericSnafu, MetaSrvSnafu, Result};
use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET};
use crate::remote::{Kv, KvBackend, KvBackendRef, ValueIter};
const CACHE_MAX_CAPACITY: u64 = 10000;
const CACHE_TTL_SECOND: u64 = 10 * 60;
const CACHE_TTI_SECOND: u64 = 5 * 60;
pub struct CachedMetaKvBackend {
kv_backend: KvBackendRef,
cache: Arc<Cache<Vec<u8>, Option<Kv>>>,
}
#[async_trait::async_trait]
impl KvBackend for CachedMetaKvBackend {
fn range<'a, 'b>(&'a self, key: &[u8]) -> ValueIter<'b, Error>
where
'a: 'b,
{
self.kv_backend.range(key)
}
async fn get(&self, key: &[u8]) -> Result<Option<Kv>> {
let _timer = timer!(METRIC_CATALOG_KV_GET);
let init = async {
let _timer = timer!(METRIC_CATALOG_KV_REMOTE_GET);
self.kv_backend.get(key).await
};
let schema_provider = self.cache.try_get_with_by_ref(key, init).await;
schema_provider.map_err(|e| GenericSnafu { msg: e.to_string() }.build())
}
async fn set(&self, key: &[u8], val: &[u8]) -> Result<()> {
let ret = self.kv_backend.set(key, val).await;
if ret.is_ok() {
self.invalidate_key(key).await;
}
ret
}
async fn delete(&self, key: &[u8]) -> Result<()> {
let ret = self.kv_backend.delete_range(key, &[]).await;
if ret.is_ok() {
self.invalidate_key(key).await;
}
ret
}
async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<()> {
// TODO(fys): implement it
unimplemented!()
}
async fn compare_and_set(
&self,
key: &[u8],
expect: &[u8],
val: &[u8],
) -> Result<std::result::Result<(), Option<Vec<u8>>>> {
let ret = self.kv_backend.compare_and_set(key, expect, val).await;
if ret.is_ok() {
self.invalidate_key(key).await;
}
ret
}
}
#[async_trait::async_trait]
impl KvCacheInvalidator for CachedMetaKvBackend {
async fn invalidate_key(&self, key: &[u8]) {
self.cache.invalidate(key).await
}
}
impl CachedMetaKvBackend {
pub fn new(client: Arc<MetaClient>) -> Self {
let cache = Arc::new(
CacheBuilder::new(CACHE_MAX_CAPACITY)
.time_to_live(Duration::from_secs(CACHE_TTL_SECOND))
.time_to_idle(Duration::from_secs(CACHE_TTI_SECOND))
.build(),
);
let kv_backend = Arc::new(MetaKvBackend { client });
Self { kv_backend, cache }
}
pub fn wrap(kv_backend: KvBackendRef) -> Self {
let cache = Arc::new(
CacheBuilder::new(CACHE_MAX_CAPACITY)
.time_to_live(Duration::from_secs(CACHE_TTL_SECOND))
.time_to_idle(Duration::from_secs(CACHE_TTI_SECOND))
.build(),
);
Self { kv_backend, cache }
}
}
#[derive(Debug)]
pub struct MetaKvBackend {
pub client: Arc<MetaClient>,
@@ -51,7 +160,7 @@ impl KvBackend for MetaKvBackend {
}))
}
async fn get(&self, key: &[u8]) -> Result<Option<Kv>, Error> {
async fn get(&self, key: &[u8]) -> Result<Option<Kv>> {
let mut response = self
.client
.range(RangeRequest::new().with_key(key))
@@ -63,7 +172,7 @@ impl KvBackend for MetaKvBackend {
.map(|kv| Kv(kv.take_key(), kv.take_value())))
}
async fn set(&self, key: &[u8], val: &[u8]) -> Result<(), Error> {
async fn set(&self, key: &[u8], val: &[u8]) -> Result<()> {
let req = PutRequest::new()
.with_key(key.to_vec())
.with_value(val.to_vec());
@@ -71,7 +180,7 @@ impl KvBackend for MetaKvBackend {
Ok(())
}
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> {
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<()> {
let req = DeleteRangeRequest::new().with_range(key.to_vec(), end.to_vec());
let resp = self.client.delete_range(req).await.context(MetaSrvSnafu)?;
info!(
@@ -89,7 +198,7 @@ impl KvBackend for MetaKvBackend {
key: &[u8],
expect: &[u8],
val: &[u8],
) -> Result<Result<(), Option<Vec<u8>>>, Error> {
) -> Result<std::result::Result<(), Option<Vec<u8>>>> {
let request = CompareAndPutRequest::new()
.with_key(key.to_vec())
.with_expect(expect.to_vec())

View File

@@ -139,12 +139,16 @@ impl KvBackend for MockKvBackend {
}
async fn delete_range(&self, key: &[u8], end: &[u8]) -> Result<(), Error> {
let start = key.to_vec();
let end = end.to_vec();
let range = start..end;
let mut map = self.map.write().await;
map.retain(|k, _| !range.contains(k));
if end.is_empty() {
let _ = map.remove(key);
} else {
let start = key.to_vec();
let end = end.to_vec();
let range = start..end;
map.retain(|k, _| !range.contains(k));
}
Ok(())
}
}

View File

@@ -24,7 +24,8 @@ mod tests {
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::{
KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider,
CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider,
RemoteSchemaProvider,
};
use catalog::{CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
@@ -76,6 +77,52 @@ mod tests {
);
}
#[tokio::test]
async fn test_cached_backend() {
common_telemetry::init_default_ut_logging();
let backend = CachedMetaKvBackend::wrap(Arc::new(MockKvBackend::default()));
let default_catalog_key = CatalogKey {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
}
.to_string();
backend
.set(
default_catalog_key.as_bytes(),
&CatalogValue {}.as_bytes().unwrap(),
)
.await
.unwrap();
let ret = backend.get(b"__c-greptime").await.unwrap();
assert!(ret.is_some());
let _ = backend
.compare_and_set(
b"__c-greptime",
&CatalogValue {}.as_bytes().unwrap(),
b"123",
)
.await
.unwrap();
let ret = backend.get(b"__c-greptime").await.unwrap();
assert!(ret.is_some());
assert_eq!(&b"123"[..], &(ret.as_ref().unwrap().1));
let _ = backend.set(b"__c-greptime", b"1234").await;
let ret = backend.get(b"__c-greptime").await.unwrap();
assert!(ret.is_some());
assert_eq!(&b"1234"[..], &(ret.as_ref().unwrap().1));
backend.delete(b"__c-greptime").await.unwrap();
let ret = backend.get(b"__c-greptime").await.unwrap();
assert!(ret.is_none());
}
async fn prepare_components(
node_id: u64,
) -> (
@@ -84,17 +131,22 @@ mod tests {
Arc<RemoteCatalogManager>,
TableEngineManagerRef,
) {
let backend = Arc::new(MockKvBackend::default()) as KvBackendRef;
let cached_backend = Arc::new(CachedMetaKvBackend::wrap(
Arc::new(MockKvBackend::default()),
));
let table_engine = Arc::new(MockTableEngine::default());
let engine_manager = Arc::new(MemoryTableEngineManager::alias(
MITO_ENGINE.to_string(),
table_engine.clone(),
));
let catalog_manager =
RemoteCatalogManager::new(engine_manager.clone(), node_id, backend.clone());
RemoteCatalogManager::new(engine_manager.clone(), node_id, cached_backend.clone());
catalog_manager.start().await.unwrap();
(
backend,
cached_backend,
table_engine,
Arc::new(catalog_manager),
engine_manager as Arc<_>,

View File

@@ -16,7 +16,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use catalog::remote::MetaKvBackend;
use catalog::remote::CachedMetaKvBackend;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::ErrorExt;
use common_query::Output;
@@ -253,9 +253,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
.context(StartMetaClientSnafu)?;
let meta_client = Arc::new(meta_client);
let backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let cached_meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let table_routes = Arc::new(TableRoutes::new(meta_client));
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
@@ -263,7 +261,8 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let datanode_clients = Arc::new(DatanodeClients::default());
let catalog_list = Arc::new(FrontendCatalogManager::new(
backend,
cached_meta_backend.clone(),
cached_meta_backend,
partition_manager,
datanode_clients,
));

View File

@@ -18,7 +18,7 @@ use std::time::Duration;
use std::{fs, path};
use api::v1::meta::Role;
use catalog::remote::MetaKvBackend;
use catalog::remote::CachedMetaKvBackend;
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::paths::{CLUSTER_DIR, WAL_DIR};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
@@ -186,12 +186,14 @@ impl Instance {
}
Mode::Distributed => {
let kv_backend = Arc::new(CachedMetaKvBackend::new(
meta_client.as_ref().unwrap().clone(),
));
let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new(
engine_manager.clone(),
opts.node_id.context(MissingNodeIdSnafu)?,
Arc::new(MetaKvBackend {
client: meta_client.as_ref().unwrap().clone(),
}),
kv_backend,
));
(catalog as CatalogManagerRef, None)
}

View File

@@ -26,7 +26,7 @@ use catalog::helper::{
build_catalog_prefix, build_schema_prefix, build_table_global_prefix, CatalogKey, SchemaKey,
TableGlobalKey, TableGlobalValue,
};
use catalog::remote::{Kv, KvBackendRef};
use catalog::remote::{Kv, KvBackendRef, KvCacheInvalidatorRef};
use catalog::{
CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
@@ -51,6 +51,7 @@ use crate::table::DistTable;
#[derive(Clone)]
pub struct FrontendCatalogManager {
backend: KvBackendRef,
backend_cache_invalidtor: KvCacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
@@ -64,11 +65,13 @@ pub struct FrontendCatalogManager {
impl FrontendCatalogManager {
pub fn new(
backend: KvBackendRef,
backend_cache_invalidtor: KvCacheInvalidatorRef,
partition_manager: PartitionRuleManagerRef,
datanode_clients: Arc<DatanodeClients>,
) -> Self {
Self {
backend,
backend_cache_invalidtor,
partition_manager,
datanode_clients,
dist_instance: None,
@@ -90,6 +93,19 @@ impl FrontendCatalogManager {
pub fn datanode_clients(&self) -> Arc<DatanodeClients> {
self.datanode_clients.clone()
}
pub async fn invalidate_table(&self, catalog: &str, schema: &str, table: &str) {
let tg_key = TableGlobalKey {
catalog_name: catalog.into(),
schema_name: schema.into(),
table_name: table.into(),
}
.to_string();
let tg_key = tg_key.as_bytes();
self.backend_cache_invalidtor.invalidate_key(tg_key).await;
}
}
// FIXME(hl): Frontend only needs a CatalogList, should replace with trait upcasting

View File

@@ -30,7 +30,7 @@ use api::v1::greptime_request::Request;
use api::v1::meta::Role;
use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest};
use async_trait::async_trait;
use catalog::remote::MetaKvBackend;
use catalog::remote::CachedMetaKvBackend;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_catalog::consts::MITO_ENGINE;
@@ -137,14 +137,16 @@ impl Instance {
datanode_clients: Arc<DatanodeClients>,
plugins: Arc<Plugins>,
) -> Result<Self> {
let meta_backend = Arc::new(MetaKvBackend {
client: meta_client.clone(),
});
let meta_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let table_routes = Arc::new(TableRoutes::new(meta_client.clone()));
let partition_manager = Arc::new(PartitionRuleManager::new(table_routes));
let mut catalog_manager =
FrontendCatalogManager::new(meta_backend, partition_manager, datanode_clients.clone());
let mut catalog_manager = FrontendCatalogManager::new(
meta_backend.clone(),
meta_backend,
partition_manager,
datanode_clients.clone(),
);
let dist_instance = DistInstance::new(
meta_client.clone(),

View File

@@ -202,6 +202,19 @@ impl DistInstance {
.await
.context(RequestDatanodeSnafu)?;
}
// Since the table information created on meta does not go through KvBackend, so we
// manually invalidate the cache here.
//
// TODO(fys): when the meta invalidation cache mechanism is established, remove it.
self.catalog_manager
.invalidate_table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await;
Ok(table)
}
@@ -260,6 +273,18 @@ impl DistInstance {
}
}
// Since the table information dropped on meta does not go through KvBackend, so we
// manually invalidate the cache here.
//
// TODO(fys): when the meta invalidation cache mechanism is established, remove it.
self.catalog_manager()
.invalidate_table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await;
Ok(Output::AffectedRows(1))
}
@@ -470,12 +495,15 @@ impl DistInstance {
} else {
expr.catalog_name.as_str()
};
let schema_name = if expr.schema_name.is_empty() {
DEFAULT_SCHEMA_NAME
} else {
expr.schema_name.as_str()
};
let table_name = expr.table_name.as_str();
let table = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
@@ -489,6 +517,7 @@ impl DistInstance {
.context(AlterExprToRequestSnafu)?;
let mut context = AlterContext::with_capacity(1);
context.insert(expr);
table.alter(context, &request).await.context(TableSnafu)?;