chore: introduce user cache invalidation api (#8129)

* chore: introduce user cache invalidation api

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update using plugins hook

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
This commit is contained in:
shuiyisong
2026-05-20 21:41:09 +08:00
committed by GitHub
parent 12a4e934f1
commit 28bed396e2
6 changed files with 47 additions and 14 deletions

View File

@@ -411,13 +411,17 @@ impl StartCommand {
);
let fundamental_cache_registry =
build_fundamental_cache_registry(readonly_meta_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?
.build(),
);
let mut layered_cache_builder = with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?;
if let Some(plugin_cache_builder) = plugins::frontend::configure_cache_registry(&plugins) {
layered_cache_builder =
layered_cache_builder.add_cache_registry(plugin_cache_builder.build());
}
let layered_cache_registry = Arc::new(layered_cache_builder.build());
// frontend to datanode need not timeout.
// Some queries are expected to take long time.

View File

@@ -411,13 +411,18 @@ impl StartCommand {
// Builds cache registry
let layered_cache_builder = LayeredCacheRegistryBuilder::default();
let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
let layered_cache_registry = Arc::new(
with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?
.build(),
);
let mut layered_cache_builder = with_default_composite_cache_registry(
layered_cache_builder.add_cache_registry(fundamental_cache_registry),
)
.context(error::BuildCacheRegistrySnafu)?;
if let Some(plugin_cache_builder) = plugins::standalone::configure_cache_registry(&plugins)
{
layered_cache_builder =
layered_cache_builder.add_cache_registry(plugin_cache_builder.build());
}
let layered_cache_registry = Arc::new(layered_cache_builder.build());
let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
builder.with_cache_registry(layered_cache_registry.clone());

View File

@@ -149,6 +149,10 @@ where
let key = NodeAddressKey::with_flownode(*node_id);
self.invalidate_key(&key.to_bytes()).await;
}
CacheIdent::User(_) => {
// User cache invalidation is handled by external
// CacheInvalidator implementations.
}
}
}
Ok(())

View File

@@ -288,6 +288,14 @@ pub enum CacheIdent {
SchemaName(SchemaName),
CreateFlow(CreateFlow),
DropFlow(DropFlow),
/// Indicate change of user metadata.
User(UserCacheIdent),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct UserCacheIdent {
pub catalog: String,
pub username: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]

View File

@@ -14,6 +14,7 @@
use auth::{DefaultPermissionChecker, PermissionCheckerRef, UserProviderRef};
use common_base::Plugins;
use common_meta::cache::CacheRegistryBuilder;
use frontend::error::{IllegalAuthConfigSnafu, Result};
use frontend::frontend::FrontendOptions;
use snafu::ResultExt;
@@ -54,6 +55,11 @@ pub async fn start_frontend_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}
/// Allows frontend plugins to add cache invalidators to the layered registry.
pub fn configure_cache_registry(_plugins: &Plugins) -> Option<CacheRegistryBuilder> {
None
}
pub mod context {
use std::sync::Arc;

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use common_base::Plugins;
use common_meta::cache::CacheRegistryBuilder;
use common_meta::kv_backend::KvBackendRef;
use standalone::error::Result;
use standalone::options::StandaloneOptions;
@@ -34,6 +35,11 @@ pub async fn start_standalone_plugins(_plugins: Plugins) -> Result<()> {
Ok(())
}
/// Allows standalone plugins to add cache invalidators to the layered registry.
pub fn configure_cache_registry(_plugins: &Plugins) -> Option<CacheRegistryBuilder> {
None
}
pub mod context {
use std::sync::Arc;