diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index c2ccd9fd72..152d384643 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; +use common_telemetry::info; use futures::future::BoxFuture; use moka::future::Cache; use moka::ops::compute::Op; @@ -89,6 +90,12 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer( match ident { CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await, CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await, + CacheIdent::FlowNodeAddressChange(node_id) => { + info!( + "Invalidate flow node cache for node_id in table_flownode: {}", + node_id + ); + cache.invalidate_all(); + } _ => {} } Ok(()) @@ -174,7 +188,10 @@ fn invalidator<'a>( } fn filter(ident: &CacheIdent) -> bool { - matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_)) + matches!( + ident, + CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_) + ) } #[cfg(test)] diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 41098ed69b..13ad6dff36 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -22,6 +22,7 @@ use crate::key::flow::flow_name::FlowNameKey; use crate::key::flow::flow_route::FlowRouteKey; use crate::key::flow::flownode_flow::FlownodeFlowKey; use crate::key::flow::table_flow::TableFlowKey; +use crate::key::node_address::NodeAddressKey; use crate::key::schema_name::SchemaNameKey; use crate::key::table_info::TableInfoKey; use crate::key::table_name::TableNameKey; @@ -53,6 +54,10 @@ pub struct Context { #[async_trait::async_trait] pub trait CacheInvalidator: Send + Sync { async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>; + + fn name(&self) -> &'static str { + std::any::type_name::() + } } pub type CacheInvalidatorRef = Arc; @@ -137,6 +142,13 @@ where let key = FlowInfoKey::new(*flow_id); self.invalidate_key(&key.to_bytes()).await; } + CacheIdent::FlowNodeAddressChange(node_id) => { + // other caches doesn't need to be invalidated + // since this is only for flownode address change not id change + common_telemetry::info!("Invalidate flow node cache for node_id: {}", node_id); + let key = NodeAddressKey::with_flownode(*node_id); + self.invalidate_key(&key.to_bytes()).await; + } } } Ok(()) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 203262a001..1d2259310c 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -174,6 +174,8 @@ pub struct UpgradeRegion { /// The identifier of cache. pub enum CacheIdent { FlowId(FlowId), + /// Indicate change of address of flownode. + FlowNodeAddressChange(u64), FlowName(FlowName), TableId(TableId), TableName(TableName), diff --git a/src/meta-srv/src/handler/remap_flow_peer_handler.rs b/src/meta-srv/src/handler/remap_flow_peer_handler.rs index 494af777fe..ec6cdb7b5a 100644 --- a/src/meta-srv/src/handler/remap_flow_peer_handler.rs +++ b/src/meta-srv/src/handler/remap_flow_peer_handler.rs @@ -13,6 +13,7 @@ // limitations under the License. use api::v1::meta::{HeartbeatRequest, Peer, Role}; +use common_meta::instruction::CacheIdent; use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue}; use common_meta::key::{MetadataKey, MetadataValue}; use common_meta::rpc::store::PutRequest; @@ -80,7 +81,19 @@ async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) { match ctx.leader_cached_kv_backend.put(put).await { Ok(_) => { info!("Successfully updated flow `NodeAddressValue`: {:?}", peer); - // TODO(discord): broadcast invalidating cache to all frontends + // broadcast invalidating cache to all frontends + let cache_idents = vec![CacheIdent::FlowNodeAddressChange(peer.id)]; + info!( + "Invalidate flow node cache for new address with cache idents: {:?}", + cache_idents + ); + if let Err(e) = ctx + .cache_invalidator + .invalidate(&Default::default(), &cache_idents) + .await + { + error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer); + } } Err(e) => { error!(e; "Failed to update flow `NodeAddressValue`: {:?}", peer);