From 28bed396e2c18d989e432b750808201457bf34ed Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Wed, 20 May 2026 21:41:09 +0800 Subject: [PATCH] chore: introduce user cache invalidation api (#8129) * chore: introduce user cache invalidation api Signed-off-by: shuiyisong * chore: update using plugins hook Signed-off-by: shuiyisong --------- Signed-off-by: shuiyisong --- src/cmd/src/frontend.rs | 18 +++++++++++------- src/cmd/src/standalone.rs | 19 ++++++++++++------- src/common/meta/src/cache_invalidator.rs | 4 ++++ src/common/meta/src/instruction.rs | 8 ++++++++ src/plugins/src/frontend.rs | 6 ++++++ src/plugins/src/standalone.rs | 6 ++++++ 6 files changed, 47 insertions(+), 14 deletions(-) diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index b192bc7608..da2c111e7c 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -411,13 +411,17 @@ impl StartCommand { ); let fundamental_cache_registry = build_fundamental_cache_registry(readonly_meta_backend.clone()); - let layered_cache_registry = Arc::new( - with_default_composite_cache_registry( - layered_cache_builder.add_cache_registry(fundamental_cache_registry), - ) - .context(error::BuildCacheRegistrySnafu)? - .build(), - ); + let mut layered_cache_builder = with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .context(error::BuildCacheRegistrySnafu)?; + + if let Some(plugin_cache_builder) = plugins::frontend::configure_cache_registry(&plugins) { + layered_cache_builder = + layered_cache_builder.add_cache_registry(plugin_cache_builder.build()); + } + + let layered_cache_registry = Arc::new(layered_cache_builder.build()); // frontend to datanode need not timeout. // Some queries are expected to take long time. diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b8a21a98c4..b0601088cf 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -411,13 +411,18 @@ impl StartCommand { // Builds cache registry let layered_cache_builder = LayeredCacheRegistryBuilder::default(); let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone()); - let layered_cache_registry = Arc::new( - with_default_composite_cache_registry( - layered_cache_builder.add_cache_registry(fundamental_cache_registry), - ) - .context(error::BuildCacheRegistrySnafu)? - .build(), - ); + let mut layered_cache_builder = with_default_composite_cache_registry( + layered_cache_builder.add_cache_registry(fundamental_cache_registry), + ) + .context(error::BuildCacheRegistrySnafu)?; + + if let Some(plugin_cache_builder) = plugins::standalone::configure_cache_registry(&plugins) + { + layered_cache_builder = + layered_cache_builder.add_cache_registry(plugin_cache_builder.build()); + } + + let layered_cache_registry = Arc::new(layered_cache_builder.build()); let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone()); builder.with_cache_registry(layered_cache_registry.clone()); diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs index 8a1ffc9d6f..ffc3dd1c9a 100644 --- a/src/common/meta/src/cache_invalidator.rs +++ b/src/common/meta/src/cache_invalidator.rs @@ -149,6 +149,10 @@ where let key = NodeAddressKey::with_flownode(*node_id); self.invalidate_key(&key.to_bytes()).await; } + CacheIdent::User(_) => { + // User cache invalidation is handled by external + // CacheInvalidator implementations. + } } } Ok(()) diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index fd8d17bdff..3fa6b1bad0 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -288,6 +288,14 @@ pub enum CacheIdent { SchemaName(SchemaName), CreateFlow(CreateFlow), DropFlow(DropFlow), + /// Indicate change of user metadata. + User(UserCacheIdent), +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct UserCacheIdent { + pub catalog: String, + pub username: String, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] diff --git a/src/plugins/src/frontend.rs b/src/plugins/src/frontend.rs index 4014460020..df7ec4fcb9 100644 --- a/src/plugins/src/frontend.rs +++ b/src/plugins/src/frontend.rs @@ -14,6 +14,7 @@ use auth::{DefaultPermissionChecker, PermissionCheckerRef, UserProviderRef}; use common_base::Plugins; +use common_meta::cache::CacheRegistryBuilder; use frontend::error::{IllegalAuthConfigSnafu, Result}; use frontend::frontend::FrontendOptions; use snafu::ResultExt; @@ -54,6 +55,11 @@ pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> { Ok(()) } +/// Allows frontend plugins to add cache invalidators to the layered registry. +pub fn configure_cache_registry(_plugins: &Plugins) -> Option { + None +} + pub mod context { use std::sync::Arc; diff --git a/src/plugins/src/standalone.rs b/src/plugins/src/standalone.rs index 0cb7ee60e5..510b84106b 100644 --- a/src/plugins/src/standalone.rs +++ b/src/plugins/src/standalone.rs @@ -13,6 +13,7 @@ // limitations under the License. use common_base::Plugins; +use common_meta::cache::CacheRegistryBuilder; use common_meta::kv_backend::KvBackendRef; use standalone::error::Result; use standalone::options::StandaloneOptions; @@ -34,6 +35,11 @@ pub async fn start_standalone_plugins(_plugins: Plugins) -> Result<()> { Ok(()) } +/// Allows standalone plugins to add cache invalidators to the layered registry. +pub fn configure_cache_registry(_plugins: &Plugins) -> Option { + None +} + pub mod context { use std::sync::Arc;