fix: schema cache invalidation (#5067)

* fix: use SchemaCache to locate database metadata

* main:
 Refactor SchemaMetadataManager to use TableInfoCacheRef

 - Replace TableInfoManagerRef with TableInfoCacheRef in SchemaMetadataManager
 - Update DatanodeBuilder to pass TableInfoCacheRef to SchemaMetadataManager
 - Rename error MissingCacheRegistrySnafu to MissingCacheSnafu in datanode module
 - Adjust tests to use new mock_schema_metadata_manager with TableInfoCacheRef

* fix/schema-cache-invalidation: Add cache module and integrate cache registry into datanode

 • Implement build_datanode_cache_registry function to create cache registry for datanode
 • Integrate cache registry into datanode by modifying DatanodeBuilder and HeartbeatTask
 • Refactor InvalidateTableCacheHandler to InvalidateCacheHandler and move to common-meta crate
 • Update Cargo.toml to include cache as a dev-dependency for datanode
 • Adjust related modules (flownode, frontend, tests-integration, standalone) to use new cache handler and registry
 • Remove obsolete handler module from frontend crate

* fix: fuzz imports

* chore: add some doc for cahce builder functions

* refactor: change table info cache to table schema cache

* fix: remove unused variants

* fix fuzz

* chore: apply suggestion

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* chore: apply suggestion

Co-authored-by: Weny Xu <wenymedia@gmail.com>

* fix: compile

---------

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: Weny Xu <wenymedia@gmail.com>
This commit is contained in:
Lei, HUANG
2024-12-03 18:44:29 +08:00
committed by GitHub
parent 51c6eafb16
commit a51853846a
27 changed files with 378 additions and 291 deletions

1
Cargo.lock generated
View File

@@ -3225,6 +3225,7 @@ dependencies = [
"arrow-flight",
"async-trait",
"bytes",
"cache",
"catalog",
"client",
"common-base",

62
src/cache/src/lib.rs vendored
View File

@@ -19,9 +19,9 @@ use std::time::Duration;
use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_view_info_cache, CacheRegistry, CacheRegistryBuilder,
LayeredCacheRegistryBuilder,
new_schema_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_table_schema_cache, new_view_info_cache, CacheRegistry,
CacheRegistryBuilder, LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;
@@ -37,9 +37,47 @@ pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache";
pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache";
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const SCHEMA_CACHE_NAME: &str = "schema_cache";
pub const TABLE_SCHEMA_NAME_CACHE_NAME: &str = "table_schema_name_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache";
/// Builds cache registry for datanode, including:
/// - Schema cache.
/// - Table id to schema name cache.
pub fn build_datanode_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table id schema name cache that never expires.
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build();
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
// Builds schema cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let schema_cache = Arc::new(new_schema_cache(
SCHEMA_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
CacheRegistryBuilder::default()
.add_cache(table_id_schema_cache)
.add_cache(schema_cache)
.build()
}
/// Builds cache registry for frontend and datanode, including:
/// - Table info cache
/// - Table name cache
/// - Table route cache
/// - Table flow node cache
/// - View cache
/// - Schema cache
pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
// Builds table info cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
@@ -95,12 +133,30 @@ pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegist
kv_backend.clone(),
));
// Builds schema cache
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let schema_cache = Arc::new(new_schema_cache(
SCHEMA_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build(),
kv_backend,
));
CacheRegistryBuilder::default()
.add_cache(table_info_cache)
.add_cache(table_name_cache)
.add_cache(table_route_cache)
.add_cache(view_info_cache)
.add_cache(table_flownode_set_cache)
.add_cache(schema_cache)
.add_cache(table_id_schema_cache)
.build()
}

View File

@@ -16,10 +16,12 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use cache::build_datanode_cache_registry;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_base::Plugins;
use common_config::Configurable;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_telemetry::logging::TracingOptions;
use common_telemetry::{info, warn};
use common_version::{short_version, version};
@@ -300,9 +302,17 @@ impl StartCommand {
client: meta_client.clone(),
});
// Builds cache registry for datanode.
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
.build(),
);
let mut datanode = DatanodeBuilder::new(opts.clone(), plugins)
.with_meta_client(meta_client)
.with_kv_backend(meta_backend)
.with_cache_registry(layered_cache_registry)
.build()
.await
.context(StartDatanodeSnafu)?;

View File

@@ -22,6 +22,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::flow::FlowMetadataManager;
@@ -30,7 +31,6 @@ use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_version::{short_version, version};
use flow::{FlownodeBuilder, FlownodeInstance, FrontendInvoker};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use meta_client::{MetaClientOptions, MetaClientType};
use servers::Mode;
use snafu::{OptionExt, ResultExt};
@@ -288,9 +288,7 @@ impl StartCommand {
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);
let heartbeat_task = flow::heartbeat::HeartbeatTask::new(

View File

@@ -24,13 +24,13 @@ use common_base::Plugins;
use common_config::Configurable;
use common_grpc::channel_manager::ChannelConfig;
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_telemetry::info;
use common_telemetry::logging::TracingOptions;
use common_time::timezone::set_default_timezone;
use common_version::{short_version, version};
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
@@ -328,9 +328,7 @@ impl StartCommand {
let executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(
layered_cache_registry.clone(),
)),
Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
]);
let heartbeat_task = HeartbeatTask::new(

View File

@@ -497,6 +497,7 @@ impl StartCommand {
let datanode = DatanodeBuilder::new(dn_opts, plugins.clone())
.with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry.clone())
.build()
.await
.context(StartDatanodeSnafu)?;

View File

@@ -24,7 +24,8 @@ pub use registry::{
LayeredCacheRegistryBuilder, LayeredCacheRegistryRef,
};
pub use table::{
new_table_info_cache, new_table_name_cache, new_table_route_cache, new_view_info_cache,
TableInfoCache, TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute,
TableRouteCache, TableRouteCacheRef, ViewInfoCache, ViewInfoCacheRef,
new_schema_cache, new_table_info_cache, new_table_name_cache, new_table_route_cache,
new_table_schema_cache, new_view_info_cache, SchemaCache, SchemaCacheRef, TableInfoCache,
TableInfoCacheRef, TableNameCache, TableNameCacheRef, TableRoute, TableRouteCache,
TableRouteCacheRef, TableSchemaCache, TableSchemaCacheRef, ViewInfoCache, ViewInfoCacheRef,
};

View File

@@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod schema;
mod table_info;
mod table_name;
mod table_route;
mod table_schema;
mod view_info;
pub use schema::{new_schema_cache, SchemaCache, SchemaCacheRef};
pub use table_info::{new_table_info_cache, TableInfoCache, TableInfoCacheRef};
pub use table_name::{new_table_name_cache, TableNameCache, TableNameCacheRef};
pub use table_route::{new_table_route_cache, TableRoute, TableRouteCache, TableRouteCacheRef};
pub use table_schema::{new_table_schema_cache, TableSchemaCache, TableSchemaCacheRef};
pub use view_info::{new_view_info_cache, ViewInfoCache, ViewInfoCacheRef};

View File

@@ -0,0 +1,73 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use futures_util::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use crate::cache::{CacheContainer, Initializer};
use crate::error::ValueNotExistSnafu;
use crate::instruction::CacheIdent;
use crate::key::schema_name::{SchemaManager, SchemaName, SchemaNameKey, SchemaNameValue};
use crate::kv_backend::KvBackendRef;
pub type SchemaCache = CacheContainer<SchemaName, Arc<SchemaNameValue>, CacheIdent>;
pub type SchemaCacheRef = Arc<SchemaCache>;
/// Constructs a [SchemaCache].
pub fn new_schema_cache(
name: String,
cache: Cache<SchemaName, Arc<SchemaNameValue>>,
kv_backend: KvBackendRef,
) -> SchemaCache {
let schema_manager = SchemaManager::new(kv_backend.clone());
let init = init_factory(schema_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(schema_manager: SchemaManager) -> Initializer<SchemaName, Arc<SchemaNameValue>> {
Arc::new(move |schema_name| {
let manager = schema_manager.clone();
Box::pin(async move {
let schema_value = manager
.get(SchemaNameKey {
catalog: &schema_name.catalog_name,
schema: &schema_name.schema_name,
})
.await?
.context(ValueNotExistSnafu)?
.into_inner();
Ok(Some(Arc::new(schema_value)))
})
})
}
fn invalidator<'a>(
cache: &'a Cache<SchemaName, Arc<SchemaNameValue>>,
ident: &'a CacheIdent,
) -> BoxFuture<'a, crate::error::Result<()>> {
Box::pin(async move {
if let CacheIdent::SchemaName(schema_name) = ident {
cache.invalidate(schema_name).await
}
Ok(())
})
}
fn filter(ident: &CacheIdent) -> bool {
matches!(ident, CacheIdent::SchemaName(_))
}

View File

@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Cache for table id to schema name mapping.
use std::sync::Arc;
use futures_util::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use store_api::storage::TableId;
use crate::cache::{CacheContainer, Initializer};
use crate::error;
use crate::instruction::CacheIdent;
use crate::key::schema_name::SchemaName;
use crate::key::table_info::TableInfoManager;
use crate::kv_backend::KvBackendRef;
pub type TableSchemaCache = CacheContainer<TableId, Arc<SchemaName>, CacheIdent>;
pub type TableSchemaCacheRef = Arc<TableSchemaCache>;
/// Constructs a [TableSchemaCache].
pub fn new_table_schema_cache(
name: String,
cache: Cache<TableId, Arc<SchemaName>>,
kv_backend: KvBackendRef,
) -> TableSchemaCache {
let table_info_manager = TableInfoManager::new(kv_backend);
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, Box::new(filter))
}
fn init_factory(table_info_manager: TableInfoManager) -> Initializer<TableId, Arc<SchemaName>> {
Arc::new(move |table_id| {
let table_info_manager = table_info_manager.clone();
Box::pin(async move {
let raw_table_info = table_info_manager
.get(*table_id)
.await?
.context(error::ValueNotExistSnafu)?
.into_inner()
.table_info;
Ok(Some(Arc::new(SchemaName {
catalog_name: raw_table_info.catalog_name,
schema_name: raw_table_info.schema_name,
})))
})
})
}
/// Never invalidates table id schema cache.
fn invalidator<'a>(
_cache: &'a Cache<TableId, Arc<SchemaName>>,
_ident: &'a CacheIdent,
) -> BoxFuture<'a, error::Result<()>> {
Box::pin(std::future::ready(Ok(())))
}
/// Never invalidates table id schema cache.
fn filter(_ident: &CacheIdent) -> bool {
false
}

View File

@@ -21,6 +21,7 @@ use common_telemetry::error;
use crate::error::Result;
use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod invalidate_table_cache;
pub mod parse_mailbox_message;
#[cfg(test)]
mod tests;

View File

@@ -13,21 +13,22 @@
// limitations under the License.
use async_trait::async_trait;
use common_meta::cache_invalidator::{CacheInvalidatorRef, Context};
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::Instruction;
use common_telemetry::debug;
use crate::cache_invalidator::{CacheInvalidatorRef, Context};
use crate::error::Result as MetaResult;
use crate::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use crate::instruction::Instruction;
#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
pub struct InvalidateCacheHandler {
cache_invalidator: CacheInvalidatorRef,
}
#[async_trait]
impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
impl HeartbeatResponseHandler for InvalidateCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
@@ -37,13 +38,10 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'")
unreachable!("InvalidateCacheHandler: should be guarded by 'is_acceptable'")
};
debug!(
"InvalidateTableCacheHandler: invalidating caches: {:?}",
caches
);
debug!("InvalidateCacheHandler: invalidating caches: {:?}", caches);
// Invalidate local cache always success
let _ = self
@@ -55,7 +53,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
}
}
impl InvalidateTableCacheHandler {
impl InvalidateCacheHandler {
pub fn new(cache_invalidator: CacheInvalidatorRef) -> Self {
Self { cache_invalidator }
}

View File

@@ -19,41 +19,39 @@ use std::sync::Arc;
use snafu::OptionExt;
use store_api::storage::TableId;
use crate::cache::{SchemaCacheRef, TableSchemaCacheRef};
use crate::error::TableInfoNotFoundSnafu;
use crate::key::schema_name::{SchemaManager, SchemaNameKey};
use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
use crate::kv_backend::KvBackendRef;
use crate::{error, SchemaOptions};
pub type SchemaMetadataManagerRef = Arc<SchemaMetadataManager>;
pub struct SchemaMetadataManager {
table_info_manager: TableInfoManagerRef,
schema_manager: SchemaManager,
table_id_schema_cache: TableSchemaCacheRef,
schema_cache: SchemaCacheRef,
#[cfg(any(test, feature = "testing"))]
kv_backend: KvBackendRef,
kv_backend: crate::kv_backend::KvBackendRef,
}
impl SchemaMetadataManager {
/// Creates a new database meta
#[cfg(not(any(test, feature = "testing")))]
pub fn new(kv_backend: KvBackendRef) -> Self {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone()));
let schema_manager = SchemaManager::new(kv_backend);
pub fn new(table_id_schema_cache: TableSchemaCacheRef, schema_cache: SchemaCacheRef) -> Self {
Self {
table_info_manager,
schema_manager,
table_id_schema_cache,
schema_cache,
}
}
/// Creates a new database meta
#[cfg(any(test, feature = "testing"))]
pub fn new(kv_backend: KvBackendRef) -> Self {
let table_info_manager = Arc::new(TableInfoManager::new(kv_backend.clone()));
let schema_manager = SchemaManager::new(kv_backend.clone());
pub fn new(
kv_backend: crate::kv_backend::KvBackendRef,
table_id_schema_cache: TableSchemaCacheRef,
schema_cache: SchemaCacheRef,
) -> Self {
Self {
table_info_manager,
schema_manager,
table_id_schema_cache,
schema_cache,
kv_backend,
}
}
@@ -62,23 +60,16 @@ impl SchemaMetadataManager {
pub async fn get_schema_options_by_table_id(
&self,
table_id: TableId,
) -> error::Result<Option<SchemaOptions>> {
let table_info = self
.table_info_manager
) -> error::Result<Option<Arc<SchemaOptions>>> {
let schema_name = self
.table_id_schema_cache
.get(table_id)
.await?
.with_context(|| TableInfoNotFoundSnafu {
table: format!("table id: {}", table_id),
})?;
let key = SchemaNameKey::new(
&table_info.table_info.catalog_name,
&table_info.table_info.schema_name,
);
self.schema_manager
.get(key)
.await
.map(|v| v.map(|v| v.into_inner()))
self.schema_cache.get_by_ref(&schema_name).await
}
#[cfg(any(test, feature = "testing"))]
@@ -100,17 +91,19 @@ impl SchemaMetadataManager {
meta: Default::default(),
table_type: TableType::Base,
});
let (txn, _) = self
.table_info_manager
let table_info_manager =
crate::key::table_info::TableInfoManager::new(self.kv_backend.clone());
let (txn, _) = table_info_manager
.build_create_txn(table_id, &value)
.unwrap();
let resp = self.kv_backend.txn(txn).await.unwrap();
assert!(resp.succeeded, "Failed to create table metadata");
let key = SchemaNameKey {
let key = crate::key::schema_name::SchemaNameKey {
catalog: catalog_name,
schema: schema_name,
};
self.schema_manager
crate::key::schema_name::SchemaManager::new(self.kv_backend.clone())
.create(key, schema_value, false)
.await
.expect("Failed to create schema metadata");

View File

@@ -171,6 +171,7 @@ impl<'a> TryFrom<&'a str> for SchemaNameKey<'a> {
}
}
#[derive(Clone)]
pub struct SchemaManager {
kv_backend: KvBackendRef,
}

View File

@@ -65,6 +65,7 @@ toml.workspace = true
tonic.workspace = true
[dev-dependencies]
cache.workspace = true
client.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-query.workspace = true

View File

@@ -18,11 +18,11 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use catalog::kvbackend::CachedKvBackendBuilder;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -57,9 +57,9 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
use crate::error::{
self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu, ShutdownServerSnafu,
StartServerSnafu,
self, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingCacheSnafu,
MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
@@ -160,6 +160,7 @@ pub struct DatanodeBuilder {
plugins: Plugins,
meta_client: Option<MetaClientRef>,
kv_backend: Option<KvBackendRef>,
cache_registry: Option<Arc<LayeredCacheRegistry>>,
}
impl DatanodeBuilder {
@@ -171,6 +172,7 @@ impl DatanodeBuilder {
plugins,
meta_client: None,
kv_backend: None,
cache_registry: None,
}
}
@@ -181,6 +183,13 @@ impl DatanodeBuilder {
}
}
pub fn with_cache_registry(self, cache_registry: Arc<LayeredCacheRegistry>) -> Self {
Self {
cache_registry: Some(cache_registry),
..self
}
}
pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
Self {
kv_backend: Some(kv_backend),
@@ -209,10 +218,16 @@ impl DatanodeBuilder {
(Box::new(NoopRegionServerEventListener) as _, None)
};
let cached_kv_backend = Arc::new(CachedKvBackendBuilder::new(kv_backend.clone()).build());
let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
let table_id_schema_cache: TableSchemaCacheRef =
cache_registry.get().context(MissingCacheSnafu)?;
let schema_metadata_manager =
Arc::new(SchemaMetadataManager::new(cached_kv_backend.clone()));
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
kv_backend.clone(),
table_id_schema_cache,
schema_cache,
));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.await?;
@@ -248,7 +263,7 @@ impl DatanodeBuilder {
&self.opts,
region_server.clone(),
meta_client,
cached_kv_backend,
cache_registry,
)
.await?,
)
@@ -591,7 +606,9 @@ mod tests {
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use cache::build_datanode_cache_registry;
use common_base::Plugins;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::key::datanode_table::DatanodeTableManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
@@ -628,13 +645,21 @@ mod tests {
mock_region_server.register_engine(mock_region.clone());
let kv_backend = Arc::new(MemoryKvBackend::new());
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(kv_backend))
.build(),
);
let builder = DatanodeBuilder::new(
DatanodeOptions {
node_id: Some(0),
..Default::default()
},
Plugins::default(),
);
)
.with_cache_registry(layered_cache_registry);
let kv = Arc::new(MemoryKvBackend::default()) as _;
setup_table_datanode(&kv).await;

View File

@@ -364,6 +364,12 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Cache not found in registry"))]
MissingCache {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -431,6 +437,7 @@ impl ErrorExt for Error {
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
StatusCode::RegionBusy
}
MissingCache { .. } => StatusCode::Internal,
}
}

View File

@@ -18,9 +18,10 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use catalog::kvbackend::CachedKvBackend;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
@@ -40,7 +41,6 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
use crate::region_server::RegionServer;
@@ -72,7 +72,7 @@ impl HeartbeatTask {
opts: &DatanodeOptions,
region_server: RegionServer,
meta_client: MetaClientRef,
cache_kv_backend: Arc<CachedKvBackend>,
cache_invalidator: CacheInvalidatorRef,
) -> Result<Self> {
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
@@ -82,7 +82,7 @@ impl HeartbeatTask {
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
Arc::new(InvalidateSchemaCacheHandler::new(cache_kv_backend)),
Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
]));
Ok(Self {

View File

@@ -24,7 +24,6 @@ use futures::future::BoxFuture;
use snafu::OptionExt;
use store_api::storage::RegionId;
pub(crate) mod cache_invalidator;
mod close_region;
mod downgrade_region;
mod open_region;

View File

@@ -1,167 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Schema cache invalidator handler
use std::sync::Arc;
use async_trait::async_trait;
use catalog::kvbackend::CachedKvBackend;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::MetadataKey;
use common_telemetry::debug;
#[derive(Clone)]
pub(crate) struct InvalidateSchemaCacheHandler {
cached_kv_backend: Arc<CachedKvBackend>,
}
#[async_trait]
impl HeartbeatResponseHandler for InvalidateSchemaCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::InvalidateCaches(_)))
)
}
async fn handle(
&self,
ctx: &mut HeartbeatResponseHandlerContext,
) -> common_meta::error::Result<HandleControl> {
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
unreachable!("InvalidateSchemaCacheHandler: should be guarded by 'is_acceptable'")
};
debug!(
"InvalidateSchemaCacheHandler: invalidating caches: {:?}",
caches
);
for cache in caches {
let CacheIdent::SchemaName(schema_name) = cache else {
continue;
};
let key: SchemaNameKey = (&schema_name).into();
let key_bytes = key.to_bytes();
// invalidate cache
self.cached_kv_backend.invalidate_key(&key_bytes).await;
}
Ok(HandleControl::Done)
}
}
impl InvalidateSchemaCacheHandler {
pub fn new(cached_kv_backend: Arc<CachedKvBackend>) -> Self {
Self { cached_kv_backend }
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::HeartbeatResponse;
use catalog::kvbackend::CachedKvBackendBuilder;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
use common_meta::key::{MetadataKey, SchemaMetadataManager};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::PutRequest;
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
#[tokio::test]
async fn test_invalidate_schema_cache_handler() {
let inner_kv = Arc::new(MemoryKvBackend::default());
let cached_kv = Arc::new(CachedKvBackendBuilder::new(inner_kv.clone()).build());
let schema_metadata_manager = SchemaMetadataManager::new(cached_kv.clone());
let schema_name = "test_schema";
let catalog_name = "test_catalog";
schema_metadata_manager
.register_region_table_info(
1,
"test_table",
schema_name,
catalog_name,
Some(SchemaNameValue {
ttl: Some(Duration::from_secs(1)),
}),
)
.await;
schema_metadata_manager
.get_schema_options_by_table_id(1)
.await
.unwrap();
let schema_key = SchemaNameKey::new(catalog_name, schema_name).to_bytes();
let new_schema_value = SchemaNameValue {
ttl: Some(Duration::from_secs(3)),
}
.try_as_raw_value()
.unwrap();
inner_kv
.put(PutRequest {
key: schema_key.clone(),
value: new_schema_value,
prev_kv: false,
})
.await
.unwrap();
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateSchemaCacheHandler::new(cached_kv),
)]));
let (tx, _) = tokio::sync::mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
// removes a valid key
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((
MessageMeta::new_test(1, "hi", "foo", "bar"),
Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(SchemaName {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
})]),
));
executor.handle(ctx).await.unwrap();
assert_eq!(
Some(Duration::from_secs(3)),
SchemaNameValue::try_from_raw_value(
&inner_kv.get(&schema_key).await.unwrap().unwrap().value
)
.unwrap()
.unwrap()
.ttl
);
}
}

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#[cfg(test)]
mod tests;
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
@@ -34,8 +37,6 @@ use crate::error::Result;
use crate::frontend::FrontendOptions;
use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
pub mod handler;
/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
#[derive(Clone)]
pub struct HeartbeatTask {

View File

@@ -1,18 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod invalidate_table_cache;
#[cfg(test)]
pub(crate) mod tests;

View File

@@ -17,6 +17,7 @@ use std::sync::{Arc, Mutex};
use api::v1::meta::HeartbeatResponse;
use common_meta::cache_invalidator::KvCacheInvalidator;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
@@ -29,8 +30,6 @@ use partition::manager::TableRouteCacheInvalidator;
use table::metadata::TableId;
use tokio::sync::mpsc;
use super::invalidate_table_cache::InvalidateTableCacheHandler;
#[derive(Default)]
pub struct MockKvCacheInvalidator {
inner: Mutex<HashMap<Vec<u8>, i32>>,
@@ -85,7 +84,7 @@ async fn test_invalidate_table_cache_handler() {
});
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateTableCacheHandler::new(backend.clone()),
InvalidateCacheHandler::new(backend.clone()),
)]));
let (tx, _) = mpsc::channel(8);
@@ -124,7 +123,7 @@ async fn test_invalidate_schema_key_handler() {
});
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateTableCacheHandler::new(backend.clone()),
InvalidateCacheHandler::new(backend.clone()),
)]));
let (tx, _) = mpsc::channel(8);

View File

@@ -679,12 +679,10 @@ fn get_expired_ssts(
#[cfg(test)]
mod tests {
use common_meta::key::SchemaMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
use tokio::sync::oneshot;
use super::*;
use crate::test_util::mock_schema_metadata_manager;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
use crate::test_util::version_util::{apply_edit, VersionControlBuilder};
@@ -694,10 +692,7 @@ mod tests {
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef));
let schema_metadata_manager = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
@@ -760,10 +755,8 @@ mod tests {
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef));
let schema_metadata_manager = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),

View File

@@ -35,6 +35,8 @@ use api::v1::{OpType, Row, Rows, SemanticType};
use common_base::readable_size::ReadableSize;
use common_base::Plugins;
use common_datasource::compression::CompressionType;
use common_meta::cache::{new_schema_cache, new_table_info_cache, new_table_schema_cache};
use common_meta::key::schema_name::{SchemaName, SchemaNameValue};
use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackendRef;
@@ -47,6 +49,7 @@ use datatypes::schema::ColumnSchema;
use log_store::kafka::log_store::KafkaLogStore;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use moka::future::{Cache, CacheBuilder};
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use object_store::ObjectStore;
@@ -210,43 +213,37 @@ impl Default for TestEnv {
impl TestEnv {
/// Returns a new env with empty prefix for test.
pub fn new() -> TestEnv {
let schema_metadata_manager = mock_schema_metadata_manager();
TestEnv {
data_home: create_temp_dir(""),
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef)),
schema_metadata_manager,
}
}
/// Returns a new env with specific `prefix` for test.
pub fn with_prefix(prefix: &str) -> TestEnv {
let schema_metadata_manager = mock_schema_metadata_manager();
TestEnv {
data_home: create_temp_dir(prefix),
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef)),
schema_metadata_manager,
}
}
/// Returns a new env with specific `data_home` for test.
pub fn with_data_home(data_home: TempDir) -> TestEnv {
let schema_metadata_manager = mock_schema_metadata_manager();
TestEnv {
data_home,
log_store: None,
log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory),
object_store_manager: None,
schema_metadata_manager: Arc::new(SchemaMetadataManager::new(Arc::new(
MemoryKvBackend::new(),
)
as KvBackendRef)),
schema_metadata_manager,
}
}
@@ -1154,3 +1151,22 @@ pub async fn reopen_region(
.unwrap();
}
}
pub(crate) fn mock_schema_metadata_manager() -> Arc<SchemaMetadataManager> {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_schema_cache = Arc::new(new_table_schema_cache(
"table_schema_name_cache".to_string(),
CacheBuilder::default().build(),
kv_backend.clone(),
));
let schema_cache = Arc::new(new_schema_cache(
"schema_cache".to_string(),
CacheBuilder::default().build(),
kv_backend.clone(),
));
Arc::new(SchemaMetadataManager::new(
kv_backend as KvBackendRef,
table_schema_cache,
schema_cache,
))
}

View File

@@ -19,7 +19,10 @@ use std::time::Duration;
use api::v1::region::region_server::RegionServer;
use arrow_flight::flight_service_server::FlightServiceServer;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use cache::{
build_datanode_cache_registry, build_fundamental_cache_registry,
with_default_composite_cache_registry,
};
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
use client::client_manager::NodeClients;
use client::Client;
@@ -27,6 +30,7 @@ use cmd::DistributedInformationExtension;
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::kv_backend::chroot::ChrootKvBackend;
@@ -42,7 +46,6 @@ use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::config::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{Datanode, DatanodeBuilder, ProcedureConfig};
use frontend::frontend::FrontendOptions;
use frontend::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use frontend::heartbeat::HeartbeatTask;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::{FrontendInstance, Instance as FeInstance};
@@ -328,8 +331,15 @@ impl GreptimeDbClusterBuilder {
client: meta_client.clone(),
});
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(meta_backend.clone()))
.build(),
);
let mut datanode = DatanodeBuilder::new(opts, Plugins::default())
.with_kv_backend(meta_backend)
.with_cache_registry(layered_cache_registry)
.with_meta_client(meta_client)
.build()
.await
@@ -382,7 +392,7 @@ impl GreptimeDbClusterBuilder {
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(InvalidateTableCacheHandler::new(cache_registry.clone())),
Arc::new(InvalidateCacheHandler::new(cache_registry.clone())),
]);
let options = FrontendOptions::default();

View File

@@ -14,7 +14,10 @@
use std::sync::Arc;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use cache::{
build_datanode_cache_registry, build_fundamental_cache_registry,
with_default_composite_cache_registry,
};
use catalog::information_schema::NoopInformationExtension;
use catalog::kvbackend::KvBackendCatalogManager;
use cmd::error::StartFlownodeSnafu;
@@ -125,8 +128,15 @@ impl GreptimeDbStandaloneBuilder {
) -> GreptimeDbStandalone {
let plugins = self.plugin.clone().unwrap_or_default();
let layered_cache_registry = Arc::new(
LayeredCacheRegistryBuilder::default()
.add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
.build(),
);
let datanode = DatanodeBuilder::new(opts.datanode_options(), plugins.clone())
.with_kv_backend(kv_backend.clone())
.with_cache_registry(layered_cache_registry)
.build()
.await
.unwrap();