mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
feat: use cache kv manager for SchemaMetadataManager (#5053)
* feat: add cache for schema options * fix/use-cache-kv-manager: Add cache invalidation handling to Datanode's heartbeat task • Implement InvalidateSchemaCacheHandler in heartbeat.rs to handle cache invalidation instructions. • Update HeartbeatTask constructor to accept cached_kv_backend and pass it to InvalidateSchemaCacheHandler. • Modify DatanodeBuilder to clone cached_kv_backend when creating schema_metadata_manager. • Refactor MetasrvCacheInvalidator in cache_invalidator.rs to reuse MailboxMessage for broadcasting to different channels. * fix: only remove schema related cache entries * chore: add more tests * fix/use-cache-kv-manager: Moved InvalidateSchemaCacheHandler to a separate module • Extracted InvalidateSchemaCacheHandler and associated tests into a new file cache_invalidator.rs • Removed async_trait and CacheInvalidator related code from heartbeat.rs • Added cache_invalidator module declaration in handler.rs * fix: unit tests * fix/use-cache-kv-manager: Standardize TODO comment format in CachedKvBackend txn method * Update src/datanode/src/heartbeat/handler/cache_invalidator.rs * Update src/datanode/src/heartbeat/handler/cache_invalidator.rs * Update src/datanode/src/heartbeat/handler/cache_invalidator.rs --------- Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend};
|
||||
pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend};
|
||||
|
||||
mod client;
|
||||
mod manager;
|
||||
|
||||
@@ -22,6 +22,7 @@ use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::KvCacheInvalidator;
|
||||
use common_meta::error::Error::CacheNotGet;
|
||||
use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
|
||||
use common_meta::kv_backend::txn::{Txn, TxnResponse};
|
||||
use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
|
||||
use common_meta::rpc::store::{
|
||||
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
|
||||
@@ -42,20 +43,20 @@ const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
|
||||
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
|
||||
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
pub struct CachedMetaKvBackendBuilder {
|
||||
pub struct CachedKvBackendBuilder {
|
||||
cache_max_capacity: Option<u64>,
|
||||
cache_ttl: Option<Duration>,
|
||||
cache_tti: Option<Duration>,
|
||||
meta_client: Arc<MetaClient>,
|
||||
inner: KvBackendRef,
|
||||
}
|
||||
|
||||
impl CachedMetaKvBackendBuilder {
|
||||
pub fn new(meta_client: Arc<MetaClient>) -> Self {
|
||||
impl CachedKvBackendBuilder {
|
||||
pub fn new(inner: KvBackendRef) -> Self {
|
||||
Self {
|
||||
cache_max_capacity: None,
|
||||
cache_ttl: None,
|
||||
cache_tti: None,
|
||||
meta_client,
|
||||
inner,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,7 +75,7 @@ impl CachedMetaKvBackendBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> CachedMetaKvBackend {
|
||||
pub fn build(self) -> CachedKvBackend {
|
||||
let cache_max_capacity = self
|
||||
.cache_max_capacity
|
||||
.unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
|
||||
@@ -85,14 +86,11 @@ impl CachedMetaKvBackendBuilder {
|
||||
.time_to_live(cache_ttl)
|
||||
.time_to_idle(cache_tti)
|
||||
.build();
|
||||
|
||||
let kv_backend = Arc::new(MetaKvBackend {
|
||||
client: self.meta_client,
|
||||
});
|
||||
let kv_backend = self.inner;
|
||||
let name = format!("CachedKvBackend({})", kv_backend.name());
|
||||
let version = AtomicUsize::new(0);
|
||||
|
||||
CachedMetaKvBackend {
|
||||
CachedKvBackend {
|
||||
kv_backend,
|
||||
cache,
|
||||
name,
|
||||
@@ -112,19 +110,29 @@ pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
|
||||
/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
|
||||
/// information. Note: If you read other information, you may read expired data, which depends on
|
||||
/// TTL and TTI for cache.
|
||||
pub struct CachedMetaKvBackend {
|
||||
pub struct CachedKvBackend {
|
||||
kv_backend: KvBackendRef,
|
||||
cache: CacheBackend,
|
||||
name: String,
|
||||
version: AtomicUsize,
|
||||
}
|
||||
|
||||
impl TxnService for CachedMetaKvBackend {
|
||||
#[async_trait::async_trait]
|
||||
impl TxnService for CachedKvBackend {
|
||||
type Error = Error;
|
||||
|
||||
async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
|
||||
// TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches.
|
||||
self.kv_backend.txn(txn).await
|
||||
}
|
||||
|
||||
fn max_txn_ops(&self) -> usize {
|
||||
self.kv_backend.max_txn_ops()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl KvBackend for CachedMetaKvBackend {
|
||||
impl KvBackend for CachedKvBackend {
|
||||
fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
@@ -305,7 +313,7 @@ impl KvBackend for CachedMetaKvBackend {
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl KvCacheInvalidator for CachedMetaKvBackend {
|
||||
impl KvCacheInvalidator for CachedKvBackend {
|
||||
async fn invalidate_key(&self, key: &[u8]) {
|
||||
self.create_new_version();
|
||||
self.cache.invalidate(key).await;
|
||||
@@ -313,7 +321,7 @@ impl KvCacheInvalidator for CachedMetaKvBackend {
|
||||
}
|
||||
}
|
||||
|
||||
impl CachedMetaKvBackend {
|
||||
impl CachedKvBackend {
|
||||
// only for test
|
||||
#[cfg(test)]
|
||||
fn wrap(kv_backend: KvBackendRef) -> Self {
|
||||
@@ -466,7 +474,7 @@ mod tests {
|
||||
use common_meta::rpc::KeyValue;
|
||||
use dashmap::DashMap;
|
||||
|
||||
use super::CachedMetaKvBackend;
|
||||
use super::CachedKvBackend;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SimpleKvBackend {
|
||||
@@ -540,7 +548,7 @@ mod tests {
|
||||
async fn test_cached_kv_backend() {
|
||||
let simple_kv = Arc::new(SimpleKvBackend::default());
|
||||
let get_execute_times = simple_kv.get_execute_times.clone();
|
||||
let cached_kv = CachedMetaKvBackend::wrap(simple_kv);
|
||||
let cached_kv = CachedKvBackend::wrap(simple_kv);
|
||||
|
||||
add_some_vals(&cached_kv).await;
|
||||
|
||||
|
||||
@@ -21,13 +21,14 @@ use cache::{
|
||||
TABLE_ROUTE_CACHE_NAME,
|
||||
};
|
||||
use catalog::kvbackend::{
|
||||
CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
|
||||
CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend,
|
||||
};
|
||||
use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_base::Plugins;
|
||||
use common_config::Mode;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
|
||||
use common_meta::kv_backend::KvBackendRef;
|
||||
use common_query::Output;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_telemetry::debug;
|
||||
@@ -258,8 +259,9 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
|
||||
.context(StartMetaClientSnafu)?;
|
||||
let meta_client = Arc::new(meta_client);
|
||||
|
||||
let cached_meta_backend =
|
||||
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
|
||||
let cached_meta_backend = Arc::new(
|
||||
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
|
||||
);
|
||||
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
|
||||
CacheRegistryBuilder::default()
|
||||
.add_cache(cached_meta_backend.clone())
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
|
||||
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
@@ -246,11 +246,12 @@ impl StartCommand {
|
||||
let cache_tti = meta_config.metadata_cache_tti;
|
||||
|
||||
// TODO(discord9): add helper function to ease the creation of cache registry&such
|
||||
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
|
||||
.cache_max_capacity(cache_max_capacity)
|
||||
.cache_ttl(cache_ttl)
|
||||
.cache_tti(cache_tti)
|
||||
.build();
|
||||
let cached_meta_backend =
|
||||
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
|
||||
.cache_max_capacity(cache_max_capacity)
|
||||
.cache_ttl(cache_ttl)
|
||||
.cache_tti(cache_tti)
|
||||
.build();
|
||||
let cached_meta_backend = Arc::new(cached_meta_backend);
|
||||
|
||||
// Builds cache registry
|
||||
|
||||
@@ -17,7 +17,7 @@ use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
|
||||
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use clap::Parser;
|
||||
use client::client_manager::NodeClients;
|
||||
use common_base::Plugins;
|
||||
@@ -293,11 +293,12 @@ impl StartCommand {
|
||||
.context(MetaClientInitSnafu)?;
|
||||
|
||||
// TODO(discord9): add helper function to ease the creation of cache registry&such
|
||||
let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone())
|
||||
.cache_max_capacity(cache_max_capacity)
|
||||
.cache_ttl(cache_ttl)
|
||||
.cache_tti(cache_tti)
|
||||
.build();
|
||||
let cached_meta_backend =
|
||||
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
|
||||
.cache_max_capacity(cache_max_capacity)
|
||||
.cache_ttl(cache_ttl)
|
||||
.cache_tti(cache_tti)
|
||||
.build();
|
||||
let cached_meta_backend = Arc::new(cached_meta_backend);
|
||||
|
||||
// Builds cache registry
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use catalog::kvbackend::CachedKvBackendBuilder;
|
||||
use catalog::memory::MemoryCatalogManager;
|
||||
use common_base::Plugins;
|
||||
use common_error::ext::BoxedError;
|
||||
@@ -208,7 +209,10 @@ impl DatanodeBuilder {
|
||||
(Box::new(NoopRegionServerEventListener) as _, None)
|
||||
};
|
||||
|
||||
let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone()));
|
||||
let cached_kv_backend = Arc::new(CachedKvBackendBuilder::new(kv_backend.clone()).build());
|
||||
|
||||
let schema_metadata_manager =
|
||||
Arc::new(SchemaMetadataManager::new(cached_kv_backend.clone()));
|
||||
let region_server = self
|
||||
.new_region_server(schema_metadata_manager, region_event_listener)
|
||||
.await?;
|
||||
@@ -239,7 +243,15 @@ impl DatanodeBuilder {
|
||||
}
|
||||
|
||||
let heartbeat_task = if let Some(meta_client) = meta_client {
|
||||
Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?)
|
||||
Some(
|
||||
HeartbeatTask::try_new(
|
||||
&self.opts,
|
||||
region_server.clone(),
|
||||
meta_client,
|
||||
cached_kv_backend,
|
||||
)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
@@ -18,6 +18,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
|
||||
use catalog::kvbackend::CachedKvBackend;
|
||||
use common_meta::datanode::REGION_STATISTIC_KEY;
|
||||
use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
|
||||
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
|
||||
@@ -39,6 +40,7 @@ use crate::alive_keeper::RegionAliveKeeper;
|
||||
use crate::config::DatanodeOptions;
|
||||
use crate::error::{self, MetaClientInitSnafu, Result};
|
||||
use crate::event_listener::RegionServerEventReceiver;
|
||||
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
|
||||
use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
|
||||
use crate::region_server::RegionServer;
|
||||
|
||||
@@ -70,6 +72,7 @@ impl HeartbeatTask {
|
||||
opts: &DatanodeOptions,
|
||||
region_server: RegionServer,
|
||||
meta_client: MetaClientRef,
|
||||
cache_kv_backend: Arc<CachedKvBackend>,
|
||||
) -> Result<Self> {
|
||||
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
|
||||
region_server.clone(),
|
||||
@@ -79,6 +82,7 @@ impl HeartbeatTask {
|
||||
region_alive_keeper.clone(),
|
||||
Arc::new(ParseMailboxMessageHandler),
|
||||
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
|
||||
Arc::new(InvalidateSchemaCacheHandler::new(cache_kv_backend)),
|
||||
]));
|
||||
|
||||
Ok(Self {
|
||||
|
||||
@@ -24,6 +24,7 @@ use futures::future::BoxFuture;
|
||||
use snafu::OptionExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
pub(crate) mod cache_invalidator;
|
||||
mod close_region;
|
||||
mod downgrade_region;
|
||||
mod open_region;
|
||||
@@ -134,7 +135,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
|
||||
}
|
||||
});
|
||||
|
||||
Ok(HandleControl::Done)
|
||||
Ok(HandleControl::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -285,7 +286,7 @@ mod tests {
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
@@ -340,7 +341,7 @@ mod tests {
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
@@ -373,7 +374,7 @@ mod tests {
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
@@ -420,7 +421,7 @@ mod tests {
|
||||
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
@@ -442,7 +443,7 @@ mod tests {
|
||||
});
|
||||
let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
|
||||
let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
|
||||
assert_matches!(control, HandleControl::Done);
|
||||
assert_matches!(control, HandleControl::Continue);
|
||||
|
||||
let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
|
||||
|
||||
|
||||
167
src/datanode/src/heartbeat/handler/cache_invalidator.rs
Normal file
167
src/datanode/src/heartbeat/handler/cache_invalidator.rs
Normal file
@@ -0,0 +1,167 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Schema cache invalidator handler
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::kvbackend::CachedKvBackend;
|
||||
use common_meta::cache_invalidator::KvCacheInvalidator;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
|
||||
};
|
||||
use common_meta::instruction::{CacheIdent, Instruction};
|
||||
use common_meta::key::schema_name::SchemaNameKey;
|
||||
use common_meta::key::MetadataKey;
|
||||
use common_telemetry::debug;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct InvalidateSchemaCacheHandler {
|
||||
cached_kv_backend: Arc<CachedKvBackend>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HeartbeatResponseHandler for InvalidateSchemaCacheHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(
|
||||
ctx.incoming_message.as_ref(),
|
||||
Some((_, Instruction::InvalidateCaches(_)))
|
||||
)
|
||||
}
|
||||
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &mut HeartbeatResponseHandlerContext,
|
||||
) -> common_meta::error::Result<HandleControl> {
|
||||
let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else {
|
||||
unreachable!("InvalidateSchemaCacheHandler: should be guarded by 'is_acceptable'")
|
||||
};
|
||||
|
||||
debug!(
|
||||
"InvalidateSchemaCacheHandler: invalidating caches: {:?}",
|
||||
caches
|
||||
);
|
||||
|
||||
for cache in caches {
|
||||
let CacheIdent::SchemaName(schema_name) = cache else {
|
||||
continue;
|
||||
};
|
||||
let key: SchemaNameKey = (&schema_name).into();
|
||||
let key_bytes = key.to_bytes();
|
||||
// invalidate cache
|
||||
self.cached_kv_backend.invalidate_key(&key_bytes).await;
|
||||
}
|
||||
|
||||
Ok(HandleControl::Done)
|
||||
}
|
||||
}
|
||||
|
||||
impl InvalidateSchemaCacheHandler {
|
||||
pub fn new(cached_kv_backend: Arc<CachedKvBackend>) -> Self {
|
||||
Self { cached_kv_backend }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::HeartbeatResponse;
|
||||
use catalog::kvbackend::CachedKvBackendBuilder;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
|
||||
};
|
||||
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
|
||||
use common_meta::instruction::{CacheIdent, Instruction};
|
||||
use common_meta::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue};
|
||||
use common_meta::key::{MetadataKey, SchemaMetadataManager};
|
||||
use common_meta::kv_backend::memory::MemoryKvBackend;
|
||||
use common_meta::kv_backend::KvBackend;
|
||||
use common_meta::rpc::store::PutRequest;
|
||||
|
||||
use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_invalidate_schema_cache_handler() {
|
||||
let inner_kv = Arc::new(MemoryKvBackend::default());
|
||||
let cached_kv = Arc::new(CachedKvBackendBuilder::new(inner_kv.clone()).build());
|
||||
let schema_metadata_manager = SchemaMetadataManager::new(cached_kv.clone());
|
||||
|
||||
let schema_name = "test_schema";
|
||||
let catalog_name = "test_catalog";
|
||||
schema_metadata_manager
|
||||
.register_region_table_info(
|
||||
1,
|
||||
"test_table",
|
||||
schema_name,
|
||||
catalog_name,
|
||||
Some(SchemaNameValue {
|
||||
ttl: Some(Duration::from_secs(1)),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
schema_metadata_manager
|
||||
.get_schema_options_by_table_id(1)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let schema_key = SchemaNameKey::new(catalog_name, schema_name).to_bytes();
|
||||
let new_schema_value = SchemaNameValue {
|
||||
ttl: Some(Duration::from_secs(3)),
|
||||
}
|
||||
.try_as_raw_value()
|
||||
.unwrap();
|
||||
inner_kv
|
||||
.put(PutRequest {
|
||||
key: schema_key.clone(),
|
||||
value: new_schema_value,
|
||||
prev_kv: false,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
|
||||
InvalidateSchemaCacheHandler::new(cached_kv),
|
||||
)]));
|
||||
|
||||
let (tx, _) = tokio::sync::mpsc::channel(8);
|
||||
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
|
||||
|
||||
// removes a valid key
|
||||
let response = HeartbeatResponse::default();
|
||||
let mut ctx: HeartbeatResponseHandlerContext =
|
||||
HeartbeatResponseHandlerContext::new(mailbox, response);
|
||||
ctx.incoming_message = Some((
|
||||
MessageMeta::new_test(1, "hi", "foo", "bar"),
|
||||
Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(SchemaName {
|
||||
catalog_name: catalog_name.to_string(),
|
||||
schema_name: schema_name.to_string(),
|
||||
})]),
|
||||
));
|
||||
executor.handle(ctx).await.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
Some(Duration::from_secs(3)),
|
||||
SchemaNameValue::try_from_raw_value(
|
||||
&inner_kv.get(&schema_key).await.unwrap().unwrap().value
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.ttl
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -44,7 +44,7 @@ impl MetasrvCacheInvalidator {
|
||||
.clone()
|
||||
.unwrap_or_else(|| DEFAULT_SUBJECT.to_string());
|
||||
|
||||
let msg = &MailboxMessage::json_message(
|
||||
let mut msg = MailboxMessage::json_message(
|
||||
subject,
|
||||
&format!("Metasrv@{}", self.info.server_addr),
|
||||
"Frontend broadcast",
|
||||
@@ -54,22 +54,21 @@ impl MetasrvCacheInvalidator {
|
||||
.with_context(|_| meta_error::SerdeJsonSnafu)?;
|
||||
|
||||
self.mailbox
|
||||
.broadcast(&BroadcastChannel::Frontend, msg)
|
||||
.broadcast(&BroadcastChannel::Frontend, &msg)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)?;
|
||||
|
||||
let msg = &MailboxMessage::json_message(
|
||||
subject,
|
||||
&format!("Metasrv@{}", self.info.server_addr),
|
||||
"Flownode broadcast",
|
||||
common_time::util::current_time_millis(),
|
||||
&instruction,
|
||||
)
|
||||
.with_context(|_| meta_error::SerdeJsonSnafu)?;
|
||||
|
||||
msg.to = "Datanode broadcast".to_string();
|
||||
self.mailbox
|
||||
.broadcast(&BroadcastChannel::Flownode, msg)
|
||||
.broadcast(&BroadcastChannel::Datanode, &msg)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)?;
|
||||
|
||||
msg.to = "Flownode broadcast".to_string();
|
||||
self.mailbox
|
||||
.broadcast(&BroadcastChannel::Flownode, &msg)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)
|
||||
|
||||
@@ -20,7 +20,7 @@ use std::time::Duration;
|
||||
use api::v1::region::region_server::RegionServer;
|
||||
use arrow_flight::flight_service_server::FlightServiceServer;
|
||||
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
|
||||
use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
|
||||
use client::client_manager::NodeClients;
|
||||
use client::Client;
|
||||
use cmd::DistributedInformationExtension;
|
||||
@@ -351,8 +351,9 @@ impl GreptimeDbClusterBuilder {
|
||||
meta_client.start(&[&metasrv.server_addr]).await.unwrap();
|
||||
let meta_client = Arc::new(meta_client);
|
||||
|
||||
let cached_meta_backend =
|
||||
Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build());
|
||||
let cached_meta_backend = Arc::new(
|
||||
CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(),
|
||||
);
|
||||
|
||||
let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
|
||||
CacheRegistryBuilder::default()
|
||||
|
||||
Reference in New Issue
Block a user