diff --git a/src/catalog/src/kvbackend.rs b/src/catalog/src/kvbackend.rs index 4414b091c3..b2a79b5e7f 100644 --- a/src/catalog/src/kvbackend.rs +++ b/src/catalog/src/kvbackend.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub use client::{CachedMetaKvBackend, CachedMetaKvBackendBuilder, MetaKvBackend}; +pub use client::{CachedKvBackend, CachedKvBackendBuilder, MetaKvBackend}; mod client; mod manager; diff --git a/src/catalog/src/kvbackend/client.rs b/src/catalog/src/kvbackend/client.rs index 46e13d03c2..7a09861850 100644 --- a/src/catalog/src/kvbackend/client.rs +++ b/src/catalog/src/kvbackend/client.rs @@ -22,6 +22,7 @@ use common_error::ext::BoxedError; use common_meta::cache_invalidator::KvCacheInvalidator; use common_meta::error::Error::CacheNotGet; use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result}; +use common_meta::kv_backend::txn::{Txn, TxnResponse}; use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService}; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -42,20 +43,20 @@ const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000; const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60); const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60); -pub struct CachedMetaKvBackendBuilder { +pub struct CachedKvBackendBuilder { cache_max_capacity: Option, cache_ttl: Option, cache_tti: Option, - meta_client: Arc, + inner: KvBackendRef, } -impl CachedMetaKvBackendBuilder { - pub fn new(meta_client: Arc) -> Self { +impl CachedKvBackendBuilder { + pub fn new(inner: KvBackendRef) -> Self { Self { cache_max_capacity: None, cache_ttl: None, cache_tti: None, - meta_client, + inner, } } @@ -74,7 +75,7 @@ impl CachedMetaKvBackendBuilder { self } - pub fn build(self) -> CachedMetaKvBackend { + pub fn build(self) -> CachedKvBackend { let cache_max_capacity = self .cache_max_capacity .unwrap_or(DEFAULT_CACHE_MAX_CAPACITY); @@ -85,14 +86,11 @@ impl CachedMetaKvBackendBuilder { .time_to_live(cache_ttl) .time_to_idle(cache_tti) .build(); - - let kv_backend = Arc::new(MetaKvBackend { - client: self.meta_client, - }); + let kv_backend = self.inner; let name = format!("CachedKvBackend({})", kv_backend.name()); let version = AtomicUsize::new(0); - CachedMetaKvBackend { + CachedKvBackend { kv_backend, cache, name, @@ -112,19 +110,29 @@ pub type CacheBackend = Cache, KeyValue>; /// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related /// information. Note: If you read other information, you may read expired data, which depends on /// TTL and TTI for cache. -pub struct CachedMetaKvBackend { +pub struct CachedKvBackend { kv_backend: KvBackendRef, cache: CacheBackend, name: String, version: AtomicUsize, } -impl TxnService for CachedMetaKvBackend { +#[async_trait::async_trait] +impl TxnService for CachedKvBackend { type Error = Error; + + async fn txn(&self, txn: Txn) -> std::result::Result { + // TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches. + self.kv_backend.txn(txn).await + } + + fn max_txn_ops(&self) -> usize { + self.kv_backend.max_txn_ops() + } } #[async_trait::async_trait] -impl KvBackend for CachedMetaKvBackend { +impl KvBackend for CachedKvBackend { fn name(&self) -> &str { &self.name } @@ -305,7 +313,7 @@ impl KvBackend for CachedMetaKvBackend { } #[async_trait::async_trait] -impl KvCacheInvalidator for CachedMetaKvBackend { +impl KvCacheInvalidator for CachedKvBackend { async fn invalidate_key(&self, key: &[u8]) { self.create_new_version(); self.cache.invalidate(key).await; @@ -313,7 +321,7 @@ impl KvCacheInvalidator for CachedMetaKvBackend { } } -impl CachedMetaKvBackend { +impl CachedKvBackend { // only for test #[cfg(test)] fn wrap(kv_backend: KvBackendRef) -> Self { @@ -466,7 +474,7 @@ mod tests { use common_meta::rpc::KeyValue; use dashmap::DashMap; - use super::CachedMetaKvBackend; + use super::CachedKvBackend; #[derive(Default)] pub struct SimpleKvBackend { @@ -540,7 +548,7 @@ mod tests { async fn test_cached_kv_backend() { let simple_kv = Arc::new(SimpleKvBackend::default()); let get_execute_times = simple_kv.get_execute_times.clone(); - let cached_kv = CachedMetaKvBackend::wrap(simple_kv); + let cached_kv = CachedKvBackend::wrap(simple_kv); add_some_vals(&cached_kv).await; diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index 0f8e5b0450..8c6e154a26 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -21,13 +21,14 @@ use cache::{ TABLE_ROUTE_CACHE_NAME, }; use catalog::kvbackend::{ - CachedMetaKvBackend, CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend, + CachedKvBackend, CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend, }; use client::{Client, Database, OutputData, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_base::Plugins; use common_config::Mode; use common_error::ext::ErrorExt; use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder}; +use common_meta::kv_backend::KvBackendRef; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::debug; @@ -258,8 +259,9 @@ async fn create_query_engine(meta_addr: &str) -> Result { .context(StartMetaClientSnafu)?; let meta_client = Arc::new(meta_client); - let cached_meta_backend = - Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); + let cached_meta_backend = Arc::new( + CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(), + ); let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( CacheRegistryBuilder::default() .add_cache(cached_meta_backend.clone()) diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index d2a8454085..18fbf51846 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; -use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; +use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; use client::client_manager::NodeClients; use common_base::Plugins; @@ -246,11 +246,12 @@ impl StartCommand { let cache_tti = meta_config.metadata_cache_tti; // TODO(discord9): add helper function to ease the creation of cache registry&such - let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone()) - .cache_max_capacity(cache_max_capacity) - .cache_ttl(cache_ttl) - .cache_tti(cache_tti) - .build(); + let cached_meta_backend = + CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))) + .cache_max_capacity(cache_max_capacity) + .cache_ttl(cache_ttl) + .cache_tti(cache_tti) + .build(); let cached_meta_backend = Arc::new(cached_meta_backend); // Builds cache registry diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index 728d584b3c..dc4645dfa1 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -17,7 +17,7 @@ use std::time::Duration; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; -use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; +use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use clap::Parser; use client::client_manager::NodeClients; use common_base::Plugins; @@ -293,11 +293,12 @@ impl StartCommand { .context(MetaClientInitSnafu)?; // TODO(discord9): add helper function to ease the creation of cache registry&such - let cached_meta_backend = CachedMetaKvBackendBuilder::new(meta_client.clone()) - .cache_max_capacity(cache_max_capacity) - .cache_ttl(cache_ttl) - .cache_tti(cache_tti) - .build(); + let cached_meta_backend = + CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))) + .cache_max_capacity(cache_max_capacity) + .cache_ttl(cache_ttl) + .cache_tti(cache_tti) + .build(); let cached_meta_backend = Arc::new(cached_meta_backend); // Builds cache registry diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index e679678745..604fee7dfd 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -18,6 +18,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; +use catalog::kvbackend::CachedKvBackendBuilder; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; use common_error::ext::BoxedError; @@ -208,7 +209,10 @@ impl DatanodeBuilder { (Box::new(NoopRegionServerEventListener) as _, None) }; - let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(kv_backend.clone())); + let cached_kv_backend = Arc::new(CachedKvBackendBuilder::new(kv_backend.clone()).build()); + + let schema_metadata_manager = + Arc::new(SchemaMetadataManager::new(cached_kv_backend.clone())); let region_server = self .new_region_server(schema_metadata_manager, region_event_listener) .await?; @@ -239,7 +243,15 @@ impl DatanodeBuilder { } let heartbeat_task = if let Some(meta_client) = meta_client { - Some(HeartbeatTask::try_new(&self.opts, region_server.clone(), meta_client).await?) + Some( + HeartbeatTask::try_new( + &self.opts, + region_server.clone(), + meta_client, + cached_kv_backend, + ) + .await?, + ) } else { None }; diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index ef9af0acdd..3bd2ba2ef1 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat}; +use catalog::kvbackend::CachedKvBackend; use common_meta::datanode::REGION_STATISTIC_KEY; use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; @@ -39,6 +40,7 @@ use crate::alive_keeper::RegionAliveKeeper; use crate::config::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; use crate::event_listener::RegionServerEventReceiver; +use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler; use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT}; use crate::region_server::RegionServer; @@ -70,6 +72,7 @@ impl HeartbeatTask { opts: &DatanodeOptions, region_server: RegionServer, meta_client: MetaClientRef, + cache_kv_backend: Arc, ) -> Result { let region_alive_keeper = Arc::new(RegionAliveKeeper::new( region_server.clone(), @@ -79,6 +82,7 @@ impl HeartbeatTask { region_alive_keeper.clone(), Arc::new(ParseMailboxMessageHandler), Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())), + Arc::new(InvalidateSchemaCacheHandler::new(cache_kv_backend)), ])); Ok(Self { diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 89b6991788..d902ae98ea 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -24,6 +24,7 @@ use futures::future::BoxFuture; use snafu::OptionExt; use store_api::storage::RegionId; +pub(crate) mod cache_invalidator; mod close_region; mod downgrade_region; mod open_region; @@ -134,7 +135,7 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler { } }); - Ok(HandleControl::Done) + Ok(HandleControl::Continue) } } @@ -285,7 +286,7 @@ mod tests { let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); - assert_matches!(control, HandleControl::Done); + assert_matches!(control, HandleControl::Continue); let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); @@ -340,7 +341,7 @@ mod tests { let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); - assert_matches!(control, HandleControl::Done); + assert_matches!(control, HandleControl::Continue); let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); @@ -373,7 +374,7 @@ mod tests { let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); - assert_matches!(control, HandleControl::Done); + assert_matches!(control, HandleControl::Continue); let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); @@ -420,7 +421,7 @@ mod tests { let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); - assert_matches!(control, HandleControl::Done); + assert_matches!(control, HandleControl::Continue); let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); @@ -442,7 +443,7 @@ mod tests { }); let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction)); let control = heartbeat_handler.handle(&mut ctx).await.unwrap(); - assert_matches!(control, HandleControl::Done); + assert_matches!(control, HandleControl::Continue); let (_, reply) = heartbeat_env.receiver.recv().await.unwrap(); diff --git a/src/datanode/src/heartbeat/handler/cache_invalidator.rs b/src/datanode/src/heartbeat/handler/cache_invalidator.rs new file mode 100644 index 0000000000..09f4c7b721 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/cache_invalidator.rs @@ -0,0 +1,167 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Schema cache invalidator handler + +use std::sync::Arc; + +use async_trait::async_trait; +use catalog::kvbackend::CachedKvBackend; +use common_meta::cache_invalidator::KvCacheInvalidator; +use common_meta::heartbeat::handler::{ + HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, +}; +use common_meta::instruction::{CacheIdent, Instruction}; +use common_meta::key::schema_name::SchemaNameKey; +use common_meta::key::MetadataKey; +use common_telemetry::debug; + +#[derive(Clone)] +pub(crate) struct InvalidateSchemaCacheHandler { + cached_kv_backend: Arc, +} + +#[async_trait] +impl HeartbeatResponseHandler for InvalidateSchemaCacheHandler { + fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { + matches!( + ctx.incoming_message.as_ref(), + Some((_, Instruction::InvalidateCaches(_))) + ) + } + + async fn handle( + &self, + ctx: &mut HeartbeatResponseHandlerContext, + ) -> common_meta::error::Result { + let Some((_, Instruction::InvalidateCaches(caches))) = ctx.incoming_message.take() else { + unreachable!("InvalidateSchemaCacheHandler: should be guarded by 'is_acceptable'") + }; + + debug!( + "InvalidateSchemaCacheHandler: invalidating caches: {:?}", + caches + ); + + for cache in caches { + let CacheIdent::SchemaName(schema_name) = cache else { + continue; + }; + let key: SchemaNameKey = (&schema_name).into(); + let key_bytes = key.to_bytes(); + // invalidate cache + self.cached_kv_backend.invalidate_key(&key_bytes).await; + } + + Ok(HandleControl::Done) + } +} + +impl InvalidateSchemaCacheHandler { + pub fn new(cached_kv_backend: Arc) -> Self { + Self { cached_kv_backend } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use api::v1::meta::HeartbeatResponse; + use catalog::kvbackend::CachedKvBackendBuilder; + use common_meta::heartbeat::handler::{ + HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, + }; + use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; + use common_meta::instruction::{CacheIdent, Instruction}; + use common_meta::key::schema_name::{SchemaName, SchemaNameKey, SchemaNameValue}; + use common_meta::key::{MetadataKey, SchemaMetadataManager}; + use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::KvBackend; + use common_meta::rpc::store::PutRequest; + + use crate::heartbeat::handler::cache_invalidator::InvalidateSchemaCacheHandler; + + #[tokio::test] + async fn test_invalidate_schema_cache_handler() { + let inner_kv = Arc::new(MemoryKvBackend::default()); + let cached_kv = Arc::new(CachedKvBackendBuilder::new(inner_kv.clone()).build()); + let schema_metadata_manager = SchemaMetadataManager::new(cached_kv.clone()); + + let schema_name = "test_schema"; + let catalog_name = "test_catalog"; + schema_metadata_manager + .register_region_table_info( + 1, + "test_table", + schema_name, + catalog_name, + Some(SchemaNameValue { + ttl: Some(Duration::from_secs(1)), + }), + ) + .await; + + schema_metadata_manager + .get_schema_options_by_table_id(1) + .await + .unwrap(); + + let schema_key = SchemaNameKey::new(catalog_name, schema_name).to_bytes(); + let new_schema_value = SchemaNameValue { + ttl: Some(Duration::from_secs(3)), + } + .try_as_raw_value() + .unwrap(); + inner_kv + .put(PutRequest { + key: schema_key.clone(), + value: new_schema_value, + prev_kv: false, + }) + .await + .unwrap(); + + let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( + InvalidateSchemaCacheHandler::new(cached_kv), + )])); + + let (tx, _) = tokio::sync::mpsc::channel(8); + let mailbox = Arc::new(HeartbeatMailbox::new(tx)); + + // removes a valid key + let response = HeartbeatResponse::default(); + let mut ctx: HeartbeatResponseHandlerContext = + HeartbeatResponseHandlerContext::new(mailbox, response); + ctx.incoming_message = Some(( + MessageMeta::new_test(1, "hi", "foo", "bar"), + Instruction::InvalidateCaches(vec![CacheIdent::SchemaName(SchemaName { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + })]), + )); + executor.handle(ctx).await.unwrap(); + + assert_eq!( + Some(Duration::from_secs(3)), + SchemaNameValue::try_from_raw_value( + &inner_kv.get(&schema_key).await.unwrap().unwrap().value + ) + .unwrap() + .unwrap() + .ttl + ); + } +} diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs index 86b11ae007..22fe7449b9 100644 --- a/src/meta-srv/src/cache_invalidator.rs +++ b/src/meta-srv/src/cache_invalidator.rs @@ -44,7 +44,7 @@ impl MetasrvCacheInvalidator { .clone() .unwrap_or_else(|| DEFAULT_SUBJECT.to_string()); - let msg = &MailboxMessage::json_message( + let mut msg = MailboxMessage::json_message( subject, &format!("Metasrv@{}", self.info.server_addr), "Frontend broadcast", @@ -54,22 +54,21 @@ impl MetasrvCacheInvalidator { .with_context(|_| meta_error::SerdeJsonSnafu)?; self.mailbox - .broadcast(&BroadcastChannel::Frontend, msg) + .broadcast(&BroadcastChannel::Frontend, &msg) .await .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - let msg = &MailboxMessage::json_message( - subject, - &format!("Metasrv@{}", self.info.server_addr), - "Flownode broadcast", - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| meta_error::SerdeJsonSnafu)?; - + msg.to = "Datanode broadcast".to_string(); self.mailbox - .broadcast(&BroadcastChannel::Flownode, msg) + .broadcast(&BroadcastChannel::Datanode, &msg) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu)?; + + msg.to = "Flownode broadcast".to_string(); + self.mailbox + .broadcast(&BroadcastChannel::Flownode, &msg) .await .map_err(BoxedError::new) .context(meta_error::ExternalSnafu) diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index 27cef2292e..0705f91fd2 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -20,7 +20,7 @@ use std::time::Duration; use api::v1::region::region_server::RegionServer; use arrow_flight::flight_service_server::FlightServiceServer; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; -use catalog::kvbackend::{CachedMetaKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; +use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend}; use client::client_manager::NodeClients; use client::Client; use cmd::DistributedInformationExtension; @@ -351,8 +351,9 @@ impl GreptimeDbClusterBuilder { meta_client.start(&[&metasrv.server_addr]).await.unwrap(); let meta_client = Arc::new(meta_client); - let cached_meta_backend = - Arc::new(CachedMetaKvBackendBuilder::new(meta_client.clone()).build()); + let cached_meta_backend = Arc::new( + CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone()))).build(), + ); let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry( CacheRegistryBuilder::default()