From 856a4e1e4f0df47e1773de845ab589e876ab05a4 Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Wed, 20 Mar 2024 18:18:28 +0800 Subject: [PATCH] refactor: refactor CacheInvalidator (#3550) * refactor: refactor InvalidateCache Instruction * refactor: refactor CacheInvalidator --- src/catalog/src/kvbackend/manager.rs | 33 ++++++-------- src/common/meta/src/cache_invalidator.rs | 44 ++++++++----------- src/common/meta/src/ddl/alter_table.rs | 11 ++++- .../meta/src/ddl/drop_table/executor.rs | 14 +++--- src/common/meta/src/instruction.rs | 19 +++++--- src/datanode/src/heartbeat/handler.rs | 4 +- .../handler/invalidate_table_cache.rs | 25 +++-------- src/frontend/src/heartbeat/handler/tests.rs | 11 +++-- src/meta-srv/src/cache_invalidator.rs | 13 ++---- .../region_failover/invalidate_cache.rs | 9 ++-- .../src/procedure/region_migration.rs | 9 ++-- src/operator/src/statement/ddl.rs | 26 +++++------ 12 files changed, 103 insertions(+), 115 deletions(-) diff --git a/src/catalog/src/kvbackend/manager.rs b/src/catalog/src/kvbackend/manager.rs index 05c2431a4a..6a6038f1da 100644 --- a/src/catalog/src/kvbackend/manager.rs +++ b/src/catalog/src/kvbackend/manager.rs @@ -25,13 +25,13 @@ use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, CacheInvalidatorRef, Context}; use common_meta::error::Result as MetaResult; +use common_meta::instruction::CacheIdent; use common_meta::key::catalog_name::CatalogNameKey; use common_meta::key::schema_name::SchemaNameKey; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_name::TableNameKey; use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; -use common_meta::table_name::TableName; use futures_util::stream::BoxStream; use futures_util::{StreamExt, TryStreamExt}; use moka::future::{Cache as AsyncCache, CacheBuilder}; @@ -39,7 +39,6 @@ use moka::sync::Cache; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use snafu::prelude::*; use table::dist_table::DistTable; -use table::metadata::TableId; use table::table::numbers::{NumbersTable, NUMBERS_TABLE_NAME}; use table::TableRef; @@ -79,24 +78,18 @@ fn make_table(table_info_value: TableInfoValue) -> CatalogResult { #[async_trait::async_trait] impl CacheInvalidator for KvBackendCatalogManager { - async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> { - self.cache_invalidator - .invalidate_table_id(ctx, table_id) - .await - } - - async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { - let table_cache_key = format_full_table_name( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - ); - self.cache_invalidator - .invalidate_table_name(ctx, table_name) - .await?; - self.table_cache.invalidate(&table_cache_key).await; - - Ok(()) + async fn invalidate(&self, ctx: &Context, caches: Vec) -> MetaResult<()> { + for cache in &caches { + if let CacheIdent::TableName(table_name) = cache { + let table_cache_key = format_full_table_name( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ); + self.table_cache.invalidate(&table_cache_key).await; + } + } + self.cache_invalidator.invalidate(ctx, caches).await } } diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 16b9d3b527..7eed7f0139 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -14,14 +14,12 @@ use std::sync::Arc; -use table::metadata::TableId; - use crate::error::Result; +use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoKey; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteKey; use crate::key::TableMetaKey; -use crate::table_name::TableName; /// KvBackend cache invalidator #[async_trait::async_trait] @@ -46,10 +44,7 @@ pub struct Context { #[async_trait::async_trait] pub trait CacheInvalidator: Send + Sync { - // Invalidates table cache - async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> Result<()>; - - async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> Result<()>; + async fn invalidate(&self, ctx: &Context, caches: Vec) -> Result<()>; } pub type CacheInvalidatorRef = Arc; @@ -58,11 +53,7 @@ pub struct DummyCacheInvalidator; #[async_trait::async_trait] impl CacheInvalidator for DummyCacheInvalidator { - async fn invalidate_table_id(&self, _ctx: &Context, _table_id: TableId) -> Result<()> { - Ok(()) - } - - async fn invalidate_table_name(&self, _ctx: &Context, _table_name: TableName) -> Result<()> { + async fn invalidate(&self, _ctx: &Context, _caches: Vec) -> Result<()> { Ok(()) } } @@ -72,21 +63,22 @@ impl CacheInvalidator for T where T: KvCacheInvalidator, { - async fn invalidate_table_name(&self, _ctx: &Context, table_name: TableName) -> Result<()> { - let key: TableNameKey = (&table_name).into(); - - self.invalidate_key(&key.as_raw_key()).await; - - Ok(()) - } - - async fn invalidate_table_id(&self, _ctx: &Context, table_id: TableId) -> Result<()> { - let key = TableInfoKey::new(table_id); - self.invalidate_key(&key.as_raw_key()).await; - - let key = &TableRouteKey { table_id }; - self.invalidate_key(&key.as_raw_key()).await; + async fn invalidate(&self, _ctx: &Context, caches: Vec) -> Result<()> { + for cache in caches { + match cache { + CacheIdent::TableId(table_id) => { + let key = TableInfoKey::new(table_id); + self.invalidate_key(&key.as_raw_key()).await; + let key = &TableRouteKey { table_id }; + self.invalidate_key(&key.as_raw_key()).await; + } + CacheIdent::TableName(table_name) => { + let key: TableNameKey = (&table_name).into(); + self.invalidate_key(&key.as_raw_key()).await + } + } + } Ok(()) } } diff --git a/src/common/meta/src/ddl/alter_table.rs b/src/common/meta/src/ddl/alter_table.rs index adffa52586..e554256a1b 100644 --- a/src/common/meta/src/ddl/alter_table.rs +++ b/src/common/meta/src/ddl/alter_table.rs @@ -43,6 +43,7 @@ use crate::cache_invalidator::Context; use crate::ddl::utils::add_peer_context_if_needed; use crate::ddl::DdlContext; use crate::error::{self, ConvertAlterTableRequestSnafu, Error, InvalidProtoMsgSnafu, Result}; +use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::DeserializedValueWithBytes; @@ -333,11 +334,17 @@ impl AlterTableProcedure { if matches!(alter_kind, Kind::RenameTable { .. }) { cache_invalidator - .invalidate_table_name(&Context::default(), self.data.table_ref().into()) + .invalidate( + &Context::default(), + vec![CacheIdent::TableName(self.data.table_ref().into())], + ) .await?; } else { cache_invalidator - .invalidate_table_id(&Context::default(), self.data.table_id()) + .invalidate( + &Context::default(), + vec![CacheIdent::TableId(self.data.table_id())], + ) .await?; }; diff --git a/src/common/meta/src/ddl/drop_table/executor.rs b/src/common/meta/src/ddl/drop_table/executor.rs index 591c1cafb3..e7e1992b33 100644 --- a/src/common/meta/src/ddl/drop_table/executor.rs +++ b/src/common/meta/src/ddl/drop_table/executor.rs @@ -28,6 +28,7 @@ use crate::cache_invalidator::Context; use crate::ddl::utils::add_peer_context_if_needed; use crate::ddl::DdlContext; use crate::error::{self, Result}; +use crate::instruction::CacheIdent; use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; @@ -120,13 +121,14 @@ impl DropTableExecutor { subject: Some("Invalidate table cache by dropping table".to_string()), }; - // TODO(weny): merge these two invalidation instructions. cache_invalidator - .invalidate_table_name(&ctx, self.table.table_ref().into()) - .await?; - - cache_invalidator - .invalidate_table_id(&ctx, self.table_id) + .invalidate( + &ctx, + vec![ + CacheIdent::TableName(self.table.table_ref().into()), + CacheIdent::TableId(self.table_id), + ], + ) .await?; Ok(()) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index d0f5c9a27d..4b00555516 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -124,7 +124,7 @@ impl OpenRegion { } /// The instruction of downgrading leader region. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct DowngradeRegion { /// The [RegionId]. pub region_id: RegionId, @@ -137,7 +137,7 @@ impl Display for DowngradeRegion { } /// Upgrades a follower region to leader region. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct UpgradeRegion { /// The [RegionId]. pub region_id: RegionId, @@ -151,7 +151,14 @@ pub struct UpgradeRegion { pub wait_for_replay_timeout: Option, } -#[derive(Debug, Clone, Serialize, Deserialize, Display)] +#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq, Eq)] +/// The identifier of cache. +pub enum CacheIdent { + TableId(TableId), + TableName(TableName), +} + +#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)] pub enum Instruction { /// Opens a region. /// @@ -165,10 +172,8 @@ pub enum Instruction { UpgradeRegion(UpgradeRegion), /// Downgrades a region. DowngradeRegion(DowngradeRegion), - /// Invalidates a specified table cache. - InvalidateTableIdCache(TableId), - /// Invalidates a specified table name index cache. - InvalidateTableNameCache(TableName), + /// Invalidates batch cache. + InvalidateCaches(Vec), } /// The reply of [UpgradeRegion]. diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 5eaa3ad88c..6b581e89ed 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -81,9 +81,7 @@ impl RegionHeartbeatResponseHandler { Instruction::UpgradeRegion(upgrade_region) => Ok(Box::new(move |handler_context| { handler_context.handle_upgrade_region_instruction(upgrade_region) })), - Instruction::InvalidateTableIdCache(_) | Instruction::InvalidateTableNameCache(_) => { - InvalidHeartbeatResponseSnafu.fail() - } + Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(), } } } diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index dfbd402089..48abd8fadd 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -20,7 +20,6 @@ use common_meta::heartbeat::handler::{ }; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; use common_telemetry::error; -use futures::future::Either; #[derive(Clone)] pub struct InvalidateTableCacheHandler { @@ -32,8 +31,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { matches!( ctx.incoming_message.as_ref(), - Some((_, Instruction::InvalidateTableIdCache { .. })) - | Some((_, Instruction::InvalidateTableNameCache { .. })) + Some((_, Instruction::InvalidateCaches(_))) ) } @@ -42,22 +40,11 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { let cache_invalidator = self.cache_invalidator.clone(); let (meta, invalidator) = match ctx.incoming_message.take() { - Some((meta, Instruction::InvalidateTableIdCache(table_id))) => ( - meta, - Either::Left(async move { - cache_invalidator - .invalidate_table_id(&Context::default(), table_id) - .await - }), - ), - Some((meta, Instruction::InvalidateTableNameCache(table_name))) => ( - meta, - Either::Right(async move { - cache_invalidator - .invalidate_table_name(&Context::default(), table_name) - .await - }), - ), + Some((meta, Instruction::InvalidateCaches(caches))) => (meta, async move { + cache_invalidator + .invalidate(&Context::default(), caches) + .await + }), _ => unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"), }; diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index ba9e954143..f23558cc7e 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -22,7 +22,7 @@ use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::instruction::{CacheIdent, Instruction, InstructionReply, SimpleReply}; use common_meta::key::table_info::TableInfoKey; use common_meta::key::TableMetaKey; use partition::manager::TableRouteCacheInvalidator; @@ -74,7 +74,7 @@ async fn test_invalidate_table_cache_handler() { handle_instruction( executor.clone(), mailbox.clone(), - Instruction::InvalidateTableIdCache(table_id), + Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]), ) .await; @@ -90,7 +90,12 @@ async fn test_invalidate_table_cache_handler() { .contains_key(&table_info_key.as_raw_key())); // removes a invalid key - handle_instruction(executor, mailbox, Instruction::InvalidateTableIdCache(0)).await; + handle_instruction( + executor, + mailbox, + Instruction::InvalidateCaches(vec![CacheIdent::TableId(0)]), + ) + .await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index c830a54896..f4085b6bf9 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -17,10 +17,8 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_meta::cache_invalidator::{CacheInvalidator, Context}; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_meta::instruction::Instruction; -use common_meta::table_name::TableName; +use common_meta::instruction::{CacheIdent, Instruction}; use snafu::ResultExt; -use table::metadata::TableId; use crate::metasrv::MetasrvInfo; use crate::service::mailbox::{BroadcastChannel, MailboxRef}; @@ -65,13 +63,8 @@ impl MetasrvCacheInvalidator { #[async_trait] impl CacheInvalidator for MetasrvCacheInvalidator { - async fn invalidate_table_id(&self, ctx: &Context, table_id: TableId) -> MetaResult<()> { - let instruction = Instruction::InvalidateTableIdCache(table_id); - self.broadcast(ctx, instruction).await - } - - async fn invalidate_table_name(&self, ctx: &Context, table_name: TableName) -> MetaResult<()> { - let instruction = Instruction::InvalidateTableNameCache(table_name); + async fn invalidate(&self, ctx: &Context, caches: Vec) -> MetaResult<()> { + let instruction = Instruction::InvalidateCaches(caches); self.broadcast(ctx, instruction).await } } diff --git a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs index a680927631..d7231abfc8 100644 --- a/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs +++ b/src/meta-srv/src/procedure/region_failover/invalidate_cache.rs @@ -14,7 +14,7 @@ use api::v1::meta::MailboxMessage; use async_trait::async_trait; -use common_meta::instruction::Instruction; +use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::RegionIdent; use common_telemetry::info; use serde::{Deserialize, Serialize}; @@ -35,7 +35,7 @@ impl InvalidateCache { ctx: &RegionFailoverContext, table_id: TableId, ) -> Result<()> { - let instruction = Instruction::InvalidateTableIdCache(table_id); + let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); let msg = &MailboxMessage::json_message( "Invalidate Table Cache", @@ -133,7 +133,10 @@ mod tests { assert_eq!( received.payload, Some(Payload::Json( - serde_json::to_string(&Instruction::InvalidateTableIdCache(table_id)).unwrap(), + serde_json::to_string(&Instruction::InvalidateCaches(vec![ + CacheIdent::TableId(table_id) + ])) + .unwrap(), )) ); } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 9e49d266cc..b1c5a22f69 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -29,7 +29,7 @@ use std::time::Duration; use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; -use common_meta::instruction::Instruction; +use common_meta::instruction::{CacheIdent, Instruction}; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; @@ -321,7 +321,7 @@ impl Context { /// Broadcasts the invalidate table cache message. pub async fn invalidate_table_cache(&self) -> Result<()> { let table_id = self.region_id().table_id(); - let instruction = Instruction::InvalidateTableIdCache(table_id); + let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); let msg = &MailboxMessage::json_message( "Invalidate Table Cache", @@ -582,7 +582,10 @@ mod tests { let msg = resp.mailbox_message.unwrap(); let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap(); - assert_matches!(instruction, Instruction::InvalidateTableIdCache(1024)); + assert_eq!( + instruction, + Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)]) + ); } fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec { diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index c338f306b3..6500b99c29 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -24,6 +24,7 @@ use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::cache_invalidator::Context; use common_meta::ddl::ExecutorContext; +use common_meta::instruction::CacheIdent; use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue}; use common_meta::key::NAME_PATTERN; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse}; @@ -329,12 +330,13 @@ impl StatementExecutor { // Invalidates local cache ASAP. self.cache_invalidator - .invalidate_table_id(&Context::default(), table_id) - .await - .context(error::InvalidateTableCacheSnafu)?; - - self.cache_invalidator - .invalidate_table_name(&Context::default(), table_name.clone()) + .invalidate( + &Context::default(), + vec![ + CacheIdent::TableId(table_id), + CacheIdent::TableName(table_name.clone()), + ], + ) .await .context(error::InvalidateTableCacheSnafu)?; @@ -459,14 +461,12 @@ impl StatementExecutor { // Invalidates local cache ASAP. self.cache_invalidator - .invalidate_table_id(&Context::default(), table_id) - .await - .context(error::InvalidateTableCacheSnafu)?; - - self.cache_invalidator - .invalidate_table_name( + .invalidate( &Context::default(), - TableName::new(catalog_name, schema_name, table_name), + vec![ + CacheIdent::TableId(table_id), + CacheIdent::TableName(TableName::new(catalog_name, schema_name, table_name)), + ], ) .await .context(error::InvalidateTableCacheSnafu)?;