diff --git a/Cargo.lock b/Cargo.lock index 61f69538dd..f23abf83b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5943,6 +5943,7 @@ dependencies = [ name = "partition" version = "0.2.0" dependencies = [ + "async-trait", "common-catalog", "common-error", "common-meta", diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs index 60fee863a1..92d905fdbb 100644 --- a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -19,11 +19,14 @@ use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent}; -use common_telemetry::error; +use common_meta::table_name::TableName; +use common_telemetry::{error, info}; +use partition::manager::TableRouteCacheInvalidatorRef; #[derive(Clone)] pub struct InvalidateTableCacheHandler { backend_cache_invalidator: KvCacheInvalidatorRef, + table_route_cache_invalidator: TableRouteCacheInvalidatorRef, } impl HeartbeatResponseHandler for InvalidateTableCacheHandler { @@ -71,9 +74,13 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler { } impl InvalidateTableCacheHandler { - pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self { + pub fn new( + backend_cache_invalidator: KvCacheInvalidatorRef, + table_route_cache_invalidator: TableRouteCacheInvalidatorRef, + ) -> Self { Self { backend_cache_invalidator, + table_route_cache_invalidator, } } @@ -84,9 +91,19 @@ impl InvalidateTableCacheHandler { table_name: table_name.to_string(), } .to_string(); - + info!("invalidate table cache: {}", tg_key); let tg_key = tg_key.as_bytes(); self.backend_cache_invalidator.invalidate_key(tg_key).await; + + let table = &TableName { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + }; + + self.table_route_cache_invalidator + .invalidate_table_route(table) + .await; } } diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index 08dfcb3364..7e495154f0 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -24,6 +24,8 @@ use common_meta::heartbeat::handler::{ }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent}; +use common_meta::table_name::TableName; +use partition::manager::TableRouteCacheInvalidator; use tokio::sync::mpsc; use super::invalidate_table_cache::InvalidateTableCacheHandler; @@ -39,6 +41,17 @@ impl KvCacheInvalidator for MockKvCacheInvalidator { } } +pub struct MockTableRouteCacheInvalidator { + inner: Mutex>, +} + +#[async_trait::async_trait] +impl TableRouteCacheInvalidator for MockTableRouteCacheInvalidator { + async fn invalidate_table_route(&self, table: &TableName) { + self.inner.lock().unwrap().remove(&table.to_string()); + } +} + #[tokio::test] async fn test_invalidate_table_cache_handler() { let table_key = TableGlobalKey { @@ -52,8 +65,13 @@ async fn test_invalidate_table_cache_handler() { inner: Mutex::new(inner), }); + let inner = HashMap::from([(table_key.to_string(), 1)]); + let table_route = Arc::new(MockTableRouteCacheInvalidator { + inner: Mutex::new(inner), + }); + let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( - InvalidateTableCacheHandler::new(backend.clone()), + InvalidateTableCacheHandler::new(backend.clone(), table_route.clone()), )])); let (tx, mut rx) = mpsc::channel(8); @@ -85,6 +103,18 @@ async fn test_invalidate_table_cache_handler() { .unwrap() .contains_key(table_key.to_string().as_bytes())); + let table_name = TableName { + catalog_name: "test".to_string(), + schema_name: "greptime".to_string(), + table_name: "foo_table".to_string(), + }; + + assert!(!table_route + .inner + .lock() + .unwrap() + .contains_key(&table_name.to_string())); + // removes a invalid key handle_instruction( executor, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 8927f79b95..974b9f5fc0 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -147,7 +147,7 @@ impl Instance { let mut catalog_manager = FrontendCatalogManager::new( meta_backend.clone(), meta_backend.clone(), - partition_manager, + partition_manager.clone(), datanode_clients.clone(), ); @@ -178,7 +178,10 @@ impl Instance { let handlers_executor = HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler::default()), - Arc::new(InvalidateTableCacheHandler::new(meta_backend)), + Arc::new(InvalidateTableCacheHandler::new( + meta_backend, + partition_manager, + )), ]); let heartbeat_task = Some(HeartbeatTask::new( diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index c7ee679c99..0746da9646 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1" common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } common-query = { path = "../common/query" } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index e1f941f67c..dbbceaaad5 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -34,6 +34,13 @@ use crate::route::TableRoutes; use crate::splitter::{InsertRequestSplit, WriteSplitter}; use crate::{error, PartitionRuleRef}; +#[async_trait::async_trait] +pub trait TableRouteCacheInvalidator: Send + Sync { + async fn invalidate_table_route(&self, table: &TableName); +} + +pub type TableRouteCacheInvalidatorRef = Arc; + pub type PartitionRuleManagerRef = Arc; /// PartitionRuleManager manages the table routes and partition rules. @@ -50,6 +57,13 @@ pub struct PartitionInfo { pub partition: PartitionDef, } +#[async_trait::async_trait] +impl TableRouteCacheInvalidator for PartitionRuleManager { + async fn invalidate_table_route(&self, table: &TableName) { + self.table_routes.invalidate_table_route(table).await + } +} + impl PartitionRuleManager { pub fn new(table_routes: Arc) -> Self { Self { table_routes }