fix: flow mirror cache (#6551)

* fix: invalid cache when flownode change address

Signed-off-by: discord9 <discord9@163.com>

* update comments

Signed-off-by: discord9 <discord9@163.com>

* fix

Signed-off-by: discord9 <discord9@163.com>

* refactor: add log&rename

Signed-off-by: discord9 <discord9@163.com>

* stuff

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-07-18 20:21:01 +08:00
committed by GitHub
parent efefddbc85
commit 73a3ac1320
4 changed files with 46 additions and 2 deletions

View File

@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use common_telemetry::info;
use futures::future::BoxFuture;
use moka::future::Cache;
use moka::ops::compute::Op;
@@ -89,6 +90,12 @@ fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId,
// we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
.map(Arc::new)
.map(Some)
.inspect(|set| {
info!(
"Initialized table_flownode cache for table_id: {}, set: {:?}",
table_id, set
);
})
})
})
}
@@ -167,6 +174,13 @@ fn invalidator<'a>(
match ident {
CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
CacheIdent::FlowNodeAddressChange(node_id) => {
info!(
"Invalidate flow node cache for node_id in table_flownode: {}",
node_id
);
cache.invalidate_all();
}
_ => {}
}
Ok(())
@@ -174,7 +188,10 @@ fn invalidator<'a>(
}
fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))
matches!(
ident,
CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_)
)
}
#[cfg(test)]

View File

@@ -22,6 +22,7 @@ use crate::key::flow::flow_name::FlowNameKey;
use crate::key::flow::flow_route::FlowRouteKey;
use crate::key::flow::flownode_flow::FlownodeFlowKey;
use crate::key::flow::table_flow::TableFlowKey;
use crate::key::node_address::NodeAddressKey;
use crate::key::schema_name::SchemaNameKey;
use crate::key::table_info::TableInfoKey;
use crate::key::table_name::TableNameKey;
@@ -53,6 +54,10 @@ pub struct Context {
#[async_trait::async_trait]
pub trait CacheInvalidator: Send + Sync {
async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
fn name(&self) -> &'static str {
std::any::type_name::<Self>()
}
}
pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
@@ -137,6 +142,13 @@ where
let key = FlowInfoKey::new(*flow_id);
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::FlowNodeAddressChange(node_id) => {
// other caches doesn't need to be invalidated
// since this is only for flownode address change not id change
common_telemetry::info!("Invalidate flow node cache for node_id: {}", node_id);
let key = NodeAddressKey::with_flownode(*node_id);
self.invalidate_key(&key.to_bytes()).await;
}
}
}
Ok(())

View File

@@ -174,6 +174,8 @@ pub struct UpgradeRegion {
/// The identifier of cache.
pub enum CacheIdent {
FlowId(FlowId),
/// Indicate change of address of flownode.
FlowNodeAddressChange(u64),
FlowName(FlowName),
TableId(TableId),
TableName(TableName),

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use api::v1::meta::{HeartbeatRequest, Peer, Role};
use common_meta::instruction::CacheIdent;
use common_meta::key::node_address::{NodeAddressKey, NodeAddressValue};
use common_meta::key::{MetadataKey, MetadataValue};
use common_meta::rpc::store::PutRequest;
@@ -80,7 +81,19 @@ async fn rewrite_node_address(ctx: &mut Context, peer: &Peer) {
match ctx.leader_cached_kv_backend.put(put).await {
Ok(_) => {
info!("Successfully updated flow `NodeAddressValue`: {:?}", peer);
// TODO(discord): broadcast invalidating cache to all frontends
// broadcast invalidating cache to all frontends
let cache_idents = vec![CacheIdent::FlowNodeAddressChange(peer.id)];
info!(
"Invalidate flow node cache for new address with cache idents: {:?}",
cache_idents
);
if let Err(e) = ctx
.cache_invalidator
.invalidate(&Default::default(), &cache_idents)
.await
{
error!(e; "Failed to invalidate {} `NodeAddressKey` cache, peer: {:?}", cache_idents.len(), peer);
}
}
Err(e) => {
error!(e; "Failed to update flow `NodeAddressValue`: {:?}", peer);