diff --git a/src/meta-srv/src/handler/collect_stats_handler.rs b/src/meta-srv/src/handler/collect_stats_handler.rs index 1389e6896f..ca4df868d1 100644 --- a/src/meta-srv/src/handler/collect_stats_handler.rs +++ b/src/meta-srv/src/handler/collect_stats_handler.rs @@ -148,6 +148,7 @@ impl HeartbeatHandler for CollectStatsHandler { mod tests { use std::sync::Arc; + use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::SequenceBuilder; @@ -184,6 +185,7 @@ mod tests { election: None, is_infancy: false, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + cache_invalidator: Arc::new(DummyCacheInvalidator), }; let handler = CollectStatsHandler::default(); diff --git a/src/meta-srv/src/handler/response_header_handler.rs b/src/meta-srv/src/handler/response_header_handler.rs index 8bcc7e9a13..baa7e7ee29 100644 --- a/src/meta-srv/src/handler/response_header_handler.rs +++ b/src/meta-srv/src/handler/response_header_handler.rs @@ -49,6 +49,7 @@ mod tests { use std::sync::Arc; use api::v1::meta::{HeartbeatResponse, RequestHeader}; + use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::sequence::SequenceBuilder; @@ -85,6 +86,7 @@ mod tests { election: None, is_infancy: false, table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())), + cache_invalidator: Arc::new(DummyCacheInvalidator), }; let req = HeartbeatRequest { diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d2a8073408..2beb09859b 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -25,6 +25,7 @@ use common_base::Plugins; use common_config::Configurable; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager; +use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::ProcedureExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; @@ -212,6 +213,7 @@ pub struct Context { pub election: Option, pub is_infancy: bool, pub table_metadata_manager: TableMetadataManagerRef, + pub cache_invalidator: CacheInvalidatorRef, } impl Context { @@ -376,6 +378,7 @@ pub struct Metasrv { greptimedb_telemetry_task: Arc, region_migration_manager: RegionMigrationManagerRef, region_supervisor_ticker: Option, + cache_invalidator: CacheInvalidatorRef, plugins: Plugins, } @@ -617,6 +620,7 @@ impl Metasrv { let mailbox = self.mailbox.clone(); let election = self.election.clone(); let table_metadata_manager = self.table_metadata_manager.clone(); + let cache_invalidator = self.cache_invalidator.clone(); Context { server_addr, @@ -628,6 +632,7 @@ impl Metasrv { election, is_infancy: false, table_metadata_manager, + cache_invalidator, } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index b6d1251c72..6b06bab867 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -319,6 +319,7 @@ impl MetasrvBuilder { region_failure_detector_controller.clone(), mailbox.clone(), options.server_addr.clone(), + cache_invalidator.clone(), ), )); region_migration_manager.try_start()?; @@ -346,7 +347,7 @@ impl MetasrvBuilder { DdlManager::try_new( DdlContext { node_manager, - cache_invalidator, + cache_invalidator: cache_invalidator.clone(), memory_region_keeper: memory_region_keeper.clone(), table_metadata_manager: table_metadata_manager.clone(), table_metadata_allocator: table_metadata_allocator.clone(), @@ -434,6 +435,7 @@ impl MetasrvBuilder { memory_region_keeper, region_migration_manager, region_supervisor_ticker, + cache_invalidator, }) } } diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index fe4dec7dda..123a489cd5 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -27,10 +27,10 @@ use std::any::Any; use std::fmt::Debug; use std::time::Duration; -use api::v1::meta::MailboxMessage; use common_error::ext::BoxedError; +use common_meta::cache_invalidator::CacheInvalidatorRef; use common_meta::ddl::RegionFailureDetectorControllerRef; -use common_meta::instruction::{CacheIdent, Instruction}; +use common_meta::instruction::CacheIdent; use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue}; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; @@ -52,7 +52,7 @@ use tokio::time::Instant; use self::migration_start::RegionMigrationStart; use crate::error::{self, Result}; -use crate::service::mailbox::{BroadcastChannel, MailboxRef}; +use crate::service::mailbox::MailboxRef; /// It's shared in each step and available even after recovering. /// @@ -158,6 +158,7 @@ pub struct DefaultContextFactory { region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, + cache_invalidator: CacheInvalidatorRef, } impl DefaultContextFactory { @@ -168,6 +169,7 @@ impl DefaultContextFactory { region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, + cache_invalidator: CacheInvalidatorRef, ) -> Self { Self { volatile_ctx: VolatileContext::default(), @@ -176,6 +178,7 @@ impl DefaultContextFactory { region_failure_detector_controller, mailbox, server_addr, + cache_invalidator, } } } @@ -190,6 +193,7 @@ impl ContextFactory for DefaultContextFactory { region_failure_detector_controller: self.region_failure_detector_controller, mailbox: self.mailbox, server_addr: self.server_addr, + cache_invalidator: self.cache_invalidator, } } } @@ -203,6 +207,7 @@ pub struct Context { region_failure_detector_controller: RegionFailureDetectorControllerRef, mailbox: MailboxRef, server_addr: String, + cache_invalidator: CacheInvalidatorRef, } impl Context { @@ -356,22 +361,13 @@ impl Context { /// Broadcasts the invalidate table cache message. pub async fn invalidate_table_cache(&self) -> Result<()> { let table_id = self.region_id().table_id(); - let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]); - - let msg = &MailboxMessage::json_message( - "Invalidate Table Cache", - &format!("Metasrv@{}", self.server_addr()), - "Frontend broadcast", - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| error::SerializeToJsonSnafu { - input: instruction.to_string(), - })?; - - self.mailbox - .broadcast(&BroadcastChannel::Frontend, msg) - .await + // ignore the result + let ctx = common_meta::cache_invalidator::Context::default(); + let _ = self + .cache_invalidator + .invalidate(&ctx, &[CacheIdent::TableId(table_id)]) + .await; + Ok(()) } } @@ -497,6 +493,7 @@ mod tests { use std::sync::Arc; use common_meta::distributed_time_constants::REGION_LEASE_SECS; + use common_meta::instruction::Instruction; use common_meta::key::test_utils::new_test_table_info; use common_meta::rpc::router::{Region, RegionRoute}; diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index 1fb63b4a34..edfb89515f 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -48,8 +48,10 @@ use super::manager::RegionMigrationProcedureTracker; use super::migration_abort::RegionMigrationAbort; use super::upgrade_candidate_region::UpgradeCandidateRegion; use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext}; +use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::error::{self, Error, Result}; use crate::handler::{HeartbeatMailbox, Pusher, Pushers}; +use crate::metasrv::MetasrvInfo; use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion; use crate::procedure::region_migration::migration_end::RegionMigrationEnd; use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion; @@ -152,6 +154,12 @@ impl TestingEnv { mailbox: self.mailbox_ctx.mailbox().clone(), server_addr: self.server_addr.to_string(), region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl), + cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( + self.mailbox_ctx.mailbox.clone(), + MetasrvInfo { + server_addr: self.server_addr.to_string(), + }, + )), } }