mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 15:00:40 +00:00
refactor: refactor cache invalidator (#2540)
This commit is contained in:
@@ -18,9 +18,7 @@ use std::sync::{Arc, Weak};
|
||||
|
||||
use common_catalog::consts::{DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, NUMBERS_TABLE_ID};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{
|
||||
CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator,
|
||||
};
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
|
||||
use common_meta::datanode_manager::DatanodeManagerRef;
|
||||
use common_meta::error::Result as MetaResult;
|
||||
use common_meta::key::catalog_name::CatalogNameKey;
|
||||
@@ -54,7 +52,7 @@ pub struct KvBackendCatalogManager {
|
||||
// TODO(LFC): Maybe use a real implementation for Standalone mode.
|
||||
// Now we use `NoopKvCacheInvalidator` for Standalone mode. In Standalone mode, the KV backend
|
||||
// is implemented by RaftEngine. Maybe we need a cache for it?
|
||||
table_metadata_cache_invalidator: TableMetadataCacheInvalidator,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
datanode_manager: DatanodeManagerRef,
|
||||
@@ -65,13 +63,13 @@ pub struct KvBackendCatalogManager {
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for KvBackendCatalogManager {
|
||||
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
|
||||
self.table_metadata_cache_invalidator
|
||||
self.cache_invalidator
|
||||
.invalidate_table_name(ctx, table_name)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
|
||||
self.table_metadata_cache_invalidator
|
||||
self.cache_invalidator
|
||||
.invalidate_table_id(ctx, table_id)
|
||||
.await
|
||||
}
|
||||
@@ -80,15 +78,13 @@ impl CacheInvalidator for KvBackendCatalogManager {
|
||||
impl KvBackendCatalogManager {
|
||||
pub fn new(
|
||||
backend: KvBackendRef,
|
||||
backend_cache_invalidator: KvCacheInvalidatorRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
datanode_manager: DatanodeManagerRef,
|
||||
) -> Arc<Self> {
|
||||
Arc::new_cyclic(|me| Self {
|
||||
partition_manager: Arc::new(PartitionRuleManager::new(backend.clone())),
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(backend)),
|
||||
table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new(
|
||||
backend_cache_invalidator.clone(),
|
||||
),
|
||||
cache_invalidator,
|
||||
datanode_manager,
|
||||
system_catalog: SystemCatalog {
|
||||
catalog_manager: me.clone(),
|
||||
@@ -107,12 +103,6 @@ impl KvBackendCatalogManager {
|
||||
pub fn datanode_manager(&self) -> DatanodeManagerRef {
|
||||
self.datanode_manager.clone()
|
||||
}
|
||||
|
||||
pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
|
||||
self.table_metadata_cache_invalidator
|
||||
.invalidate_schema(catalog, schema)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
|
||||
@@ -17,7 +17,6 @@ use std::sync::Arc;
|
||||
use table::metadata::TableId;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::key::schema_name::SchemaNameKey;
|
||||
use crate::key::table_info::TableInfoKey;
|
||||
use crate::key::table_name::TableNameKey;
|
||||
use crate::key::table_route::TableRouteKey;
|
||||
@@ -68,36 +67,25 @@ impl CacheInvalidator for DummyCacheInvalidator {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct TableMetadataCacheInvalidator(KvCacheInvalidatorRef);
|
||||
|
||||
impl TableMetadataCacheInvalidator {
|
||||
pub fn new(kv_cache_invalidator: KvCacheInvalidatorRef) -> Self {
|
||||
Self(kv_cache_invalidator)
|
||||
}
|
||||
|
||||
pub async fn invalidate_schema(&self, catalog: &str, schema: &str) {
|
||||
let key = SchemaNameKey::new(catalog, schema).as_raw_key();
|
||||
self.0.invalidate_key(&key).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for TableMetadataCacheInvalidator {
|
||||
impl<T> CacheInvalidator for T
|
||||
where
|
||||
T: KvCacheInvalidator,
|
||||
{
|
||||
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> {
|
||||
let key: TableNameKey = (&table_name).into();
|
||||
|
||||
self.0.invalidate_key(&key.as_raw_key()).await;
|
||||
self.invalidate_key(&key.as_raw_key()).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> {
|
||||
let key = TableInfoKey::new(table_id);
|
||||
self.0.invalidate_key(&key.as_raw_key()).await;
|
||||
self.invalidate_key(&key.as_raw_key()).await;
|
||||
|
||||
let key = &TableRouteKey { table_id };
|
||||
self.0.invalidate_key(&key.as_raw_key()).await;
|
||||
self.invalidate_key(&key.as_raw_key()).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -13,9 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_meta::cache_invalidator::{
|
||||
CacheInvalidator, Context, KvCacheInvalidatorRef, TableMetadataCacheInvalidator,
|
||||
};
|
||||
use common_meta::cache_invalidator::{CacheInvalidatorRef, Context};
|
||||
use common_meta::error::Result as MetaResult;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
|
||||
@@ -26,7 +24,7 @@ use futures::future::Either;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InvalidateTableCacheHandler {
|
||||
table_metadata_cache_invalidator: TableMetadataCacheInvalidator,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -41,7 +39,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
|
||||
|
||||
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
let mailbox = ctx.mailbox.clone();
|
||||
let cache_invalidator = self.table_metadata_cache_invalidator.clone();
|
||||
let cache_invalidator = self.cache_invalidator.clone();
|
||||
|
||||
let (meta, invalidator) = match ctx.incoming_message.take() {
|
||||
Some((meta, Instruction::InvalidateTableIdCache(table_id))) => (
|
||||
@@ -86,11 +84,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
|
||||
}
|
||||
|
||||
impl InvalidateTableCacheHandler {
|
||||
pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self {
|
||||
Self {
|
||||
table_metadata_cache_invalidator: TableMetadataCacheInvalidator::new(
|
||||
backend_cache_invalidator,
|
||||
),
|
||||
}
|
||||
pub fn new(cache_invalidator: CacheInvalidatorRef) -> Self {
|
||||
Self { cache_invalidator }
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user