refactor: unify table metadata cache invalidator (#2449)

* refactor: unify table metadata cache invalidator

* chore: apply from suggestions
This commit is contained in:
Weny Xu
2023-09-21 12:45:49 +09:00
committed by GitHub
parent 20f4f7971a
commit 580d11b1e1
8 changed files with 103 additions and 97 deletions

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
pub use client::{CachedMetaKvBackend, MetaKvBackend};
mod client;
@@ -22,18 +20,3 @@ mod manager;
#[cfg(feature = "testing")]
pub mod mock;
pub use manager::KvBackendCatalogManager;
/// KvBackend cache invalidator
#[async_trait::async_trait]
pub trait KvCacheInvalidator: Send + Sync {
async fn invalidate_key(&self, key: &[u8]);
}
pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
pub struct DummyKvCacheInvalidator;
#[async_trait::async_trait]
impl KvCacheInvalidator for DummyKvCacheInvalidator {
async fn invalidate_key(&self, _key: &[u8]) {}
}

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Duration;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::error::Error::{CacheNotGet, GetKvCache};
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, Result};
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
@@ -28,12 +29,11 @@ use common_meta::rpc::store::{
RangeRequest, RangeResponse,
};
use common_meta::rpc::KeyValue;
use common_telemetry::timer;
use common_telemetry::{debug, timer};
use meta_client::client::MetaClient;
use moka::future::{Cache, CacheBuilder};
use snafu::{OptionExt, ResultExt};
use super::KvCacheInvalidator;
use crate::metrics::{METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET};
const CACHE_MAX_CAPACITY: u64 = 10000;
@@ -197,7 +197,8 @@ impl KvBackend for CachedMetaKvBackend {
#[async_trait::async_trait]
impl KvCacheInvalidator for CachedMetaKvBackend {
async fn invalidate_key(&self, key: &[u8]) {
self.cache.invalidate(key).await
self.cache.invalidate(key).await;
debug!("invalidated cache key: {}", String::from_utf8_lossy(key));
}
}

View File

@@ -18,18 +18,17 @@ use std::sync::{Arc, Weak};
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
use common_meta::cache_invalidator::{
CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator,
};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::error::Result as MetaResult;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::{TableMetaKey, TableMetadataManager, TableMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use common_telemetry::debug;
use futures_util::TryStreamExt;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
@@ -43,7 +42,6 @@ use crate::error::{
TableMetadataManagerSnafu,
};
use crate::information_schema::{InformationSchemaProvider, COLUMNS, TABLES};
use crate::kvbackend::KvCacheInvalidatorRef;
use crate::CatalogManager;
/// Access all existing catalog, schema and tables.
@@ -56,7 +54,7 @@ pub struct KvBackendCatalogManager {
// TODO(LFC): Maybe use a real implementation for Standalone mode.
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
// is implemented by RaftEngine. Maybe we need a cache for it?
backend_cache_invalidator: KvCacheInvalidatorRef,
table_metadata_cache_invalidator: TableMetadataCacheInvalidator,
partition_manager: PartitionRuleManagerRef,
table_metadata_manager: TableMetadataManagerRef,
datanode_manager: DatanodeManagerRef,
@@ -66,40 +64,16 @@ pub struct KvBackendCatalogManager {
#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> MetaResult<()> {
let key: TableNameKey = (&table_name).into();
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);
Ok(())
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
self.table_metadata_cache_invalidator
.invalidate_table_name(ctx, table_name)
.await
}
async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> MetaResult<()> {
let key = TableInfoKey::new(table_id);
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);
let key = &TableRouteKey { table_id };
self.backend_cache_invalidator
.invalidate_key(&key.as_raw_key())
.await;
debug!(
"invalidated cache key: {}",
String::from_utf8_lossy(&key.as_raw_key())
);
Ok(())
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
self.table_metadata_cache_invalidator
.invalidate_table_id(ctx, table_id)
.await
}
}
@@ -110,9 +84,11 @@ impl KvBackendCatalogManager {
datanode_manager: DatanodeManagerRef,
) -> Arc<Self> {
Arc::new_cyclic(|me| Self {
backend_cache_invalidator,
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new(
backend_cache_invalidator.clone(),
),
datanode_manager,
system_catalog: SystemCatalog {
catalog_manager: me.clone(),
@@ -133,9 +109,9 @@ impl KvBackendCatalogManager {
}
pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let key = SchemaNameKey::new(catalog, schema).as_raw_key();
self.backend_cache_invalidator.invalidate_key(&key).await;
self.table_metadata_cache_invalidator
.invalidate_schema(catalog, schema)
.await
}
}

View File

@@ -14,11 +14,12 @@
use std::sync::Arc;
use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager};
use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
use common_config::{kv_store_dir, KvStoreConfig, WalConfig};
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;

View File

@@ -17,8 +17,28 @@ use std::sync::Arc;
use table::metadata::TableId;
use crate::error::Result;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteKey;
use crate::key::TableMetaKey;
use crate::table_name::TableName;
/// KvBackend cache invalidator
#[async_trait::async_trait]
pub trait KvCacheInvalidator: Send + Sync {
async fn invalidate_key(&self, key: &[u8]);
}
pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
pub struct DummyKvCacheInvalidator;
#[async_trait::async_trait]
impl KvCacheInvalidator for DummyKvCacheInvalidator {
async fn invalidate_key(&self, _key: &[u8]) {}
}
/// Places context of invalidating cache. e.g., span id, trace id etc.
#[derive(Default)]
pub struct Context {
@@ -47,3 +67,38 @@ impl CacheInvalidator for DummyCacheInvalidator {
Ok(())
}
}
#[derive(Clone)]
pub struct TableMetadataCacheInvalidator(KvCacheInvalidatorRef);
impl TableMetadataCacheInvalidator {
pub fn new(kv_cache_invalidator: KvCacheInvalidatorRef) -> Self {
Self(kv_cache_invalidator)
}
pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
let key = SchemaNameKey::new(catalog, schema).as_raw_key();
self.0.invalidate_key(&key).await;
}
}
#[async_trait::async_trait]
impl CacheInvalidator for TableMetadataCacheInvalidator {
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> {
let key: TableNameKey = (&table_name).into();
self.0.invalidate_key(&key.as_raw_key()).await;
Ok(())
}
async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> {
let key = TableInfoKey::new(table_id);
self.0.invalidate_key(&key.as_raw_key()).await;
let key = &TableRouteKey { table_id };
self.0.invalidate_key(&key.as_raw_key()).await;
Ok(())
}
}

View File

@@ -13,24 +13,20 @@
// limitations under the License.
use async_trait::async_trait;
use catalog::kvbackend::KvCacheInvalidatorRef;
use common_meta::cache_invalidator::{
CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator,
};
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::table_route::TableRouteKey;
use common_meta::key::TableMetaKey;
use common_meta::table_name::TableName;
use common_telemetry::error;
use futures::future::Either;
use table::metadata::TableId;
#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
backend_cache_invalidator: KvCacheInvalidatorRef,
table_metadata_cache_invalidator: TableMetadataCacheInvalidator,
}
#[async_trait]
@@ -45,24 +41,31 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let mailbox = ctx.mailbox.clone();
let self_ref = self.clone();
let cache_invalidator = self.table_metadata_cache_invalidator.clone();
let (meta, invalidator) = match ctx.incoming_message.take() {
Some((meta, Instruction::InvalidateTableIdCache(table_id))) => (
meta,
Either::Left(async move { self_ref.invalidate_table_id_cache(table_id).await }),
Either::Left(async move {
cache_invalidator
.invalidate_table_id(&Context::default(), table_id)
.await
}),
),
Some((meta, Instruction::InvalidateTableNameCache(table_name))) => (
meta,
Either::Right(
async move { self_ref.invalidate_table_name_cache(table_name).await },
),
Either::Right(async move {
cache_invalidator
.invalidate_table_name(&Context::default(), table_name)
.await
}),
),
_ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"),
};
let _handle = common_runtime::spawn_bg(async move {
invalidator.await;
// Local cache invalidation always succeeds.
let _ = invalidator.await;
if let Err(e) = mailbox
.send((
@@ -85,23 +88,9 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
impl InvalidateTableCacheHandler {
pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self {
Self {
backend_cache_invalidator,
table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new(
backend_cache_invalidator,
),
}
}
async fn invalidate_table_id_cache(&self, table_id: TableId) {
self.backend_cache_invalidator
.invalidate_key(&TableInfoKey::new(table_id).as_raw_key())
.await;
self.backend_cache_invalidator
.invalidate_key(&TableRouteKey { table_id }.as_raw_key())
.await;
}
async fn invalidate_table_name_cache(&self, table_name: TableName) {
self.backend_cache_invalidator
.invalidate_key(&TableNameKey::from(&table_name).as_raw_key())
.await;
}
}

View File

@@ -17,7 +17,7 @@ use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use api::v1::meta::HeartbeatResponse;
use catalog::kvbackend::KvCacheInvalidator;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};

View File

@@ -14,9 +14,10 @@
use std::sync::Arc;
use catalog::kvbackend::{DummyKvCacheInvalidator, KvBackendCatalogManager};
use catalog::kvbackend::KvBackendCatalogManager;
use common_base::Plugins;
use common_config::KvStoreConfig;
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_procedure::options::ProcedureConfig;
use datanode::config::DatanodeOptions;
use datanode::datanode::DatanodeBuilder;