refactor: refactor CacheInvalidator (#3550)

* refactor: refactor InvalidateCache Instruction

* refactor: refactor CacheInvalidator
This commit is contained in:
Weny Xu
2024-03-20 18:18:28 +08:00
committed by GitHub
parent 39b69f1e3b
commit 856a4e1e4f
12 changed files with 103 additions and 115 deletions

View File

@@ -25,13 +25,13 @@ use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::instruction::CacheIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
use futures_util::stream::BoxStream;
use futures_util::{StreamExt, TryStreamExt};
use moka::future::{Cache as AsyncCache, CacheBuilder};
@@ -39,7 +39,6 @@ use moka::sync::Cache;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use snafu::prelude::*;
use table::dist_table::DistTable;
use table::metadata::TableId;
use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME};
use table::TableRef;
@@ -79,24 +78,18 @@ fn make_table(table_info_value: TableInfoValue) -> CatalogResult<TableRef> {
#[async_trait::async_trait]
impl CacheInvalidator for KvBackendCatalogManager {
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
self.cache_invalidator
.invalidate_table_id(ctx, table_id)
.await
}
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
self.cache_invalidator
.invalidate_table_name(ctx, table_name)
.await?;
self.table_cache.invalidate(&table_cache_key).await;
Ok(())
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> MetaResult<()> {
for cache in &caches {
if let CacheIdent::TableName(table_name) = cache {
let table_cache_key = format_full_table_name(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
);
self.table_cache.invalidate(&table_cache_key).await;
}
}
self.cache_invalidator.invalidate(ctx, caches).await
}
}

View File

@@ -14,14 +14,12 @@
use std::sync::Arc;
use table::metadata::TableId;
use crate::error::Result;
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteKey;
use crate::key::TableMetaKey;
use crate::table_name::TableName;
/// KvBackend cache invalidator
#[async_trait::async_trait]
@@ -46,10 +44,7 @@ pub struct Context {
#[async_trait::async_trait]
pub trait CacheInvalidator: Send + Sync {
// Invalidates table cache
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> Result<()>;
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> Result<()>;
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> Result<()>;
}
pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
@@ -58,11 +53,7 @@ pub struct DummyCacheInvalidator;
#[async_trait::async_trait]
impl CacheInvalidator for DummyCacheInvalidator {
async fn invalidate_table_id(&self, _ctx: &Context, _table_id: TableId) -> Result<()> {
Ok(())
}
async fn invalidate_table_name(&self, _ctx: &Context, _table_name: TableName) -> Result<()> {
async fn invalidate(&self, _ctx: &Context, _caches: Vec<CacheIdent>) -> Result<()> {
Ok(())
}
}
@@ -72,21 +63,22 @@ impl<T> CacheInvalidator for T
where
T: KvCacheInvalidator,
{
async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> {
let key: TableNameKey = (&table_name).into();
self.invalidate_key(&key.as_raw_key()).await;
Ok(())
}
async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> {
let key = TableInfoKey::new(table_id);
self.invalidate_key(&key.as_raw_key()).await;
let key = &TableRouteKey { table_id };
self.invalidate_key(&key.as_raw_key()).await;
async fn invalidate(&self, _ctx: &Context, caches: Vec<CacheIdent>) -> Result<()> {
for cache in caches {
match cache {
CacheIdent::TableId(table_id) => {
let key = TableInfoKey::new(table_id);
self.invalidate_key(&key.as_raw_key()).await;
let key = &TableRouteKey { table_id };
self.invalidate_key(&key.as_raw_key()).await;
}
CacheIdent::TableName(table_name) => {
let key: TableNameKey = (&table_name).into();
self.invalidate_key(&key.as_raw_key()).await
}
}
}
Ok(())
}
}

View File

@@ -43,6 +43,7 @@ use crate::cache_invalidator::Context;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::DeserializedValueWithBytes;
@@ -333,11 +334,17 @@ impl AlterTableProcedure {
if matches!(alter_kind, Kind::RenameTable { .. }) {
cache_invalidator
.invalidate_table_name(&Context::default(), self.data.table_ref().into())
.invalidate(
&Context::default(),
vec![CacheIdent::TableName(self.data.table_ref().into())],
)
.await?;
} else {
cache_invalidator
.invalidate_table_id(&Context::default(), self.data.table_id())
.invalidate(
&Context::default(),
vec![CacheIdent::TableId(self.data.table_id())],
)
.await?;
};

View File

@@ -28,6 +28,7 @@ use crate::cache_invalidator::Context;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
@@ -120,13 +121,14 @@ impl DropTableExecutor {
subject: Some("Invalidate table cache by dropping table".to_string()),
};
// TODO(weny): merge these two invalidation instructions.
cache_invalidator
.invalidate_table_name(&ctx, self.table.table_ref().into())
.await?;
cache_invalidator
.invalidate_table_id(&ctx, self.table_id)
.invalidate(
&ctx,
vec![
CacheIdent::TableName(self.table.table_ref().into()),
CacheIdent::TableId(self.table_id),
],
)
.await?;
Ok(())

View File

@@ -124,7 +124,7 @@ impl OpenRegion {
}
/// The instruction of downgrading leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DowngradeRegion {
/// The [RegionId].
pub region_id: RegionId,
@@ -137,7 +137,7 @@ impl Display for DowngradeRegion {
}
/// Upgrades a follower region to leader region.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct UpgradeRegion {
/// The [RegionId].
pub region_id: RegionId,
@@ -151,7 +151,14 @@ pub struct UpgradeRegion {
pub wait_for_replay_timeout: Option<Duration>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Display)]
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq, Eq)]
/// The identifier of cache.
pub enum CacheIdent {
TableId(TableId),
TableName(TableName),
}
#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens a region.
///
@@ -165,10 +172,8 @@ pub enum Instruction {
UpgradeRegion(UpgradeRegion),
/// Downgrades a region.
DowngradeRegion(DowngradeRegion),
/// Invalidates a specified table cache.
InvalidateTableIdCache(TableId),
/// Invalidates a specified table name index cache.
InvalidateTableNameCache(TableName),
/// Invalidates batch cache.
InvalidateCaches(Vec<CacheIdent>),
}
/// The reply of [UpgradeRegion].

View File

@@ -81,9 +81,7 @@ impl RegionHeartbeatResponseHandler {
Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_upgrade_region_instruction(upgrade_region)
})),
Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => {
InvalidHeartbeatResponseSnafu.fail()
}
Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
}
}
}

View File

@@ -20,7 +20,6 @@ use common_meta::heartbeat::handler::{
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_telemetry::error;
use futures::future::Either;
#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
@@ -32,8 +31,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::InvalidateTableIdCache { .. }))
| Some((_, Instruction::InvalidateTableNameCache { .. }))
Some((_, Instruction::InvalidateCaches(_)))
)
}
@@ -42,22 +40,11 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
let cache_invalidator = self.cache_invalidator.clone();
let (meta, invalidator) = match ctx.incoming_message.take() {
Some((meta, Instruction::InvalidateTableIdCache(table_id))) => (
meta,
Either::Left(async move {
cache_invalidator
.invalidate_table_id(&Context::default(), table_id)
.await
}),
),
Some((meta, Instruction::InvalidateTableNameCache(table_name))) => (
meta,
Either::Right(async move {
cache_invalidator
.invalidate_table_name(&Context::default(), table_name)
.await
}),
),
Some((meta, Instruction::InvalidateCaches(caches))) => (meta, async move {
cache_invalidator
.invalidate(&Context::default(), caches)
.await
}),
_ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"),
};

View File

@@ -22,7 +22,7 @@ use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::instruction::{CacheIdent, Instruction, InstructionReply, SimpleReply};
use common_meta::key::table_info::TableInfoKey;
use common_meta::key::TableMetaKey;
use partition::manager::TableRouteCacheInvalidator;
@@ -74,7 +74,7 @@ async fn test_invalidate_table_cache_handler() {
handle_instruction(
executor.clone(),
mailbox.clone(),
Instruction::InvalidateTableIdCache(table_id),
Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]),
)
.await;
@@ -90,7 +90,12 @@ async fn test_invalidate_table_cache_handler() {
.contains_key(&table_info_key.as_raw_key()));
// removes a invalid key
handle_instruction(executor, mailbox, Instruction::InvalidateTableIdCache(0)).await;
handle_instruction(
executor,
mailbox,
Instruction::InvalidateCaches(vec![CacheIdent::TableId(0)]),
)
.await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(

View File

@@ -17,10 +17,8 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::{CacheInvalidator, Context};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::instruction::Instruction;
use common_meta::table_name::TableName;
use common_meta::instruction::{CacheIdent, Instruction};
use snafu::ResultExt;
use table::metadata::TableId;
use crate::metasrv::MetasrvInfo;
use crate::service::mailbox::{BroadcastChannel, MailboxRef};
@@ -65,13 +63,8 @@ impl MetasrvCacheInvalidator {
#[async_trait]
impl CacheInvalidator for MetasrvCacheInvalidator {
async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> {
let instruction = Instruction::InvalidateTableIdCache(table_id);
self.broadcast(ctx, instruction).await
}
async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> {
let instruction = Instruction::InvalidateTableNameCache(table_name);
async fn invalidate(&self, ctx: &Context, caches: Vec<CacheIdent>) -> MetaResult<()> {
let instruction = Instruction::InvalidateCaches(caches);
self.broadcast(ctx, instruction).await
}
}

View File

@@ -14,7 +14,7 @@
use api::v1::meta::MailboxMessage;
use async_trait::async_trait;
use common_meta::instruction::Instruction;
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::RegionIdent;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
@@ -35,7 +35,7 @@ impl InvalidateCache {
ctx: &RegionFailoverContext,
table_id: TableId,
) -> Result<()> {
let instruction = Instruction::InvalidateTableIdCache(table_id);
let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]);
let msg = &MailboxMessage::json_message(
"Invalidate Table Cache",
@@ -133,7 +133,10 @@ mod tests {
assert_eq!(
received.payload,
Some(Payload::Json(
serde_json::to_string(&Instruction::InvalidateTableIdCache(table_id)).unwrap(),
serde_json::to_string(&Instruction::InvalidateCaches(vec![
CacheIdent::TableId(table_id)
]))
.unwrap(),
))
);
}

View File

@@ -29,7 +29,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::instruction::Instruction;
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
@@ -321,7 +321,7 @@ impl Context {
/// Broadcasts the invalidate table cache message.
pub async fn invalidate_table_cache(&self) -> Result<()> {
let table_id = self.region_id().table_id();
let instruction = Instruction::InvalidateTableIdCache(table_id);
let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]);
let msg = &MailboxMessage::json_message(
"Invalidate Table Cache",
@@ -582,7 +582,10 @@ mod tests {
let msg = resp.mailbox_message.unwrap();
let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024));
assert_eq!(
instruction,
Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
);
}
fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {

View File

@@ -24,6 +24,7 @@ use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::key::NAME_PATTERN;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
@@ -329,12 +330,13 @@ impl StatementExecutor {
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate_table_id(&Context::default(), table_id)
.await
.context(error::InvalidateTableCacheSnafu)?;
self.cache_invalidator
.invalidate_table_name(&Context::default(), table_name.clone())
.invalidate(
&Context::default(),
vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(table_name.clone()),
],
)
.await
.context(error::InvalidateTableCacheSnafu)?;
@@ -459,14 +461,12 @@ impl StatementExecutor {
// Invalidates local cache ASAP.
self.cache_invalidator
.invalidate_table_id(&Context::default(), table_id)
.await
.context(error::InvalidateTableCacheSnafu)?;
self.cache_invalidator
.invalidate_table_name(
.invalidate(
&Context::default(),
TableName::new(catalog_name, schema_name, table_name),
vec![
CacheIdent::TableId(table_id),
CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)),
],
)
.await
.context(error::InvalidateTableCacheSnafu)?;