fix: invalidate table route cache (#1663)

This commit is contained in:
Weny Xu
2023-05-29 19:49:23 +09:00
committed by GitHub
parent 8b9b5a0d3a
commit 0eaae634fa
6 changed files with 72 additions and 6 deletions

1
Cargo.lock generated
View File

@@ -5943,6 +5943,7 @@ dependencies = [
name = "partition"
version = "0.2.0"
dependencies = [
"async-trait",
"common-catalog",
"common-error",
"common-meta",

View File

@@ -19,11 +19,14 @@ use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent};
use common_telemetry::error;
use common_meta::table_name::TableName;
use common_telemetry::{error, info};
use partition::manager::TableRouteCacheInvalidatorRef;
#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
backend_cache_invalidator: KvCacheInvalidatorRef,
table_route_cache_invalidator: TableRouteCacheInvalidatorRef,
}
impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
@@ -71,9 +74,13 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
}
impl InvalidateTableCacheHandler {
pub fn new(backend_cache_invalidator: KvCacheInvalidatorRef) -> Self {
pub fn new(
backend_cache_invalidator: KvCacheInvalidatorRef,
table_route_cache_invalidator: TableRouteCacheInvalidatorRef,
) -> Self {
Self {
backend_cache_invalidator,
table_route_cache_invalidator,
}
}
@@ -84,9 +91,19 @@ impl InvalidateTableCacheHandler {
table_name: table_name.to_string(),
}
.to_string();
info!("invalidate table cache: {}", tg_key);
let tg_key = tg_key.as_bytes();
self.backend_cache_invalidator.invalidate_key(tg_key).await;
let table = &TableName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
};
self.table_route_cache_invalidator
.invalidate_table_route(table)
.await;
}
}

View File

@@ -24,6 +24,8 @@ use common_meta::heartbeat::handler::{
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent};
use common_meta::table_name::TableName;
use partition::manager::TableRouteCacheInvalidator;
use tokio::sync::mpsc;
use super::invalidate_table_cache::InvalidateTableCacheHandler;
@@ -39,6 +41,17 @@ impl KvCacheInvalidator for MockKvCacheInvalidator {
}
}
pub struct MockTableRouteCacheInvalidator {
inner: Mutex<HashMap<String, i32>>,
}
#[async_trait::async_trait]
impl TableRouteCacheInvalidator for MockTableRouteCacheInvalidator {
async fn invalidate_table_route(&self, table: &TableName) {
self.inner.lock().unwrap().remove(&table.to_string());
}
}
#[tokio::test]
async fn test_invalidate_table_cache_handler() {
let table_key = TableGlobalKey {
@@ -52,8 +65,13 @@ async fn test_invalidate_table_cache_handler() {
inner: Mutex::new(inner),
});
let inner = HashMap::from([(table_key.to_string(), 1)]);
let table_route = Arc::new(MockTableRouteCacheInvalidator {
inner: Mutex::new(inner),
});
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateTableCacheHandler::new(backend.clone()),
InvalidateTableCacheHandler::new(backend.clone(), table_route.clone()),
)]));
let (tx, mut rx) = mpsc::channel(8);
@@ -85,6 +103,18 @@ async fn test_invalidate_table_cache_handler() {
.unwrap()
.contains_key(table_key.to_string().as_bytes()));
let table_name = TableName {
catalog_name: "test".to_string(),
schema_name: "greptime".to_string(),
table_name: "foo_table".to_string(),
};
assert!(!table_route
.inner
.lock()
.unwrap()
.contains_key(&table_name.to_string()));
// removes a invalid key
handle_instruction(
executor,

View File

@@ -147,7 +147,7 @@ impl Instance {
let mut catalog_manager = FrontendCatalogManager::new(
meta_backend.clone(),
meta_backend.clone(),
partition_manager,
partition_manager.clone(),
datanode_clients.clone(),
);
@@ -178,7 +178,10 @@ impl Instance {
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(InvalidateTableCacheHandler::new(meta_backend)),
Arc::new(InvalidateTableCacheHandler::new(
meta_backend,
partition_manager,
)),
]);
let heartbeat_task = Some(HeartbeatTask::new(

View File

@@ -7,6 +7,7 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1"
common-catalog = { path = "../common/catalog" }
common-error = { path = "../common/error" }
common-query = { path = "../common/query" }

View File

@@ -34,6 +34,13 @@ use crate::route::TableRoutes;
use crate::splitter::{InsertRequestSplit, WriteSplitter};
use crate::{error, PartitionRuleRef};
#[async_trait::async_trait]
pub trait TableRouteCacheInvalidator: Send + Sync {
async fn invalidate_table_route(&self, table: &TableName);
}
pub type TableRouteCacheInvalidatorRef = Arc<dyn TableRouteCacheInvalidator>;
pub type PartitionRuleManagerRef = Arc<PartitionRuleManager>;
/// PartitionRuleManager manages the table routes and partition rules.
@@ -50,6 +57,13 @@ pub struct PartitionInfo {
pub partition: PartitionDef,
}
#[async_trait::async_trait]
impl TableRouteCacheInvalidator for PartitionRuleManager {
async fn invalidate_table_route(&self, table: &TableName) {
self.table_routes.invalidate_table_route(table).await
}
}
impl PartitionRuleManager {
pub fn new(table_routes: Arc<TableRoutes>) -> Self {
Self { table_routes }