feat: invalidate cache via invalidator on region migration (#4682)

feat: invalidate table via invalidator on region migration
This commit is contained in:
jeremyhi
2024-09-05 14:15:38 +08:00
committed by GitHub
parent 86cef648cd
commit 8814695b58
6 changed files with 36 additions and 20 deletions

View File

@@ -148,6 +148,7 @@ impl HeartbeatHandler for CollectStatsHandler {
mod tests {
use std::sync::Arc;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
@@ -184,6 +185,7 @@ mod tests {
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
};
let handler = CollectStatsHandler::default();

View File

@@ -49,6 +49,7 @@ mod tests {
use std::sync::Arc;
use api::v1::meta::{HeartbeatResponse, RequestHeader};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::sequence::SequenceBuilder;
@@ -85,6 +86,7 @@ mod tests {
election: None,
is_infancy: false,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
cache_invalidator: Arc::new(DummyCacheInvalidator),
};
let req = HeartbeatRequest {

View File

@@ -25,6 +25,7 @@ use common_base::Plugins;
use common_config::Configurable;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
@@ -212,6 +213,7 @@ pub struct Context {
pub election: Option<ElectionRef>,
pub is_infancy: bool,
pub table_metadata_manager: TableMetadataManagerRef,
pub cache_invalidator: CacheInvalidatorRef,
}
impl Context {
@@ -376,6 +378,7 @@ pub struct Metasrv {
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
region_migration_manager: RegionMigrationManagerRef,
region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
cache_invalidator: CacheInvalidatorRef,
plugins: Plugins,
}
@@ -617,6 +620,7 @@ impl Metasrv {
let mailbox = self.mailbox.clone();
let election = self.election.clone();
let table_metadata_manager = self.table_metadata_manager.clone();
let cache_invalidator = self.cache_invalidator.clone();
Context {
server_addr,
@@ -628,6 +632,7 @@ impl Metasrv {
election,
is_infancy: false,
table_metadata_manager,
cache_invalidator,
}
}
}

View File

@@ -319,6 +319,7 @@ impl MetasrvBuilder {
region_failure_detector_controller.clone(),
mailbox.clone(),
options.server_addr.clone(),
cache_invalidator.clone(),
),
));
region_migration_manager.try_start()?;
@@ -346,7 +347,7 @@ impl MetasrvBuilder {
DdlManager::try_new(
DdlContext {
node_manager,
cache_invalidator,
cache_invalidator: cache_invalidator.clone(),
memory_region_keeper: memory_region_keeper.clone(),
table_metadata_manager: table_metadata_manager.clone(),
table_metadata_allocator: table_metadata_allocator.clone(),
@@ -434,6 +435,7 @@ impl MetasrvBuilder {
memory_region_keeper,
region_migration_manager,
region_supervisor_ticker,
cache_invalidator,
})
}
}

View File

@@ -27,10 +27,10 @@ use std::any::Any;
use std::fmt::Debug;
use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::RegionFailureDetectorControllerRef;
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::instruction::CacheIdent;
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
use common_meta::key::table_route::TableRouteValue;
@@ -52,7 +52,7 @@ use tokio::time::Instant;
use self::migration_start::RegionMigrationStart;
use crate::error::{self, Result};
use crate::service::mailbox::{BroadcastChannel, MailboxRef};
use crate::service::mailbox::MailboxRef;
/// It's shared in each step and available even after recovering.
///
@@ -158,6 +158,7 @@ pub struct DefaultContextFactory {
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
cache_invalidator: CacheInvalidatorRef,
}
impl DefaultContextFactory {
@@ -168,6 +169,7 @@ impl DefaultContextFactory {
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
cache_invalidator: CacheInvalidatorRef,
) -> Self {
Self {
volatile_ctx: VolatileContext::default(),
@@ -176,6 +178,7 @@ impl DefaultContextFactory {
region_failure_detector_controller,
mailbox,
server_addr,
cache_invalidator,
}
}
}
@@ -190,6 +193,7 @@ impl ContextFactory for DefaultContextFactory {
region_failure_detector_controller: self.region_failure_detector_controller,
mailbox: self.mailbox,
server_addr: self.server_addr,
cache_invalidator: self.cache_invalidator,
}
}
}
@@ -203,6 +207,7 @@ pub struct Context {
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
cache_invalidator: CacheInvalidatorRef,
}
impl Context {
@@ -356,22 +361,13 @@ impl Context {
/// Broadcasts the invalidate table cache message.
pub async fn invalidate_table_cache(&self) -> Result<()> {
let table_id = self.region_id().table_id();
let instruction = Instruction::InvalidateCaches(vec![CacheIdent::TableId(table_id)]);
let msg = &MailboxMessage::json_message(
"Invalidate Table Cache",
&format!("Metasrv@{}", self.server_addr()),
"Frontend broadcast",
common_time::util::current_time_millis(),
&instruction,
)
.with_context(|_| error::SerializeToJsonSnafu {
input: instruction.to_string(),
})?;
self.mailbox
.broadcast(&BroadcastChannel::Frontend, msg)
.await
// ignore the result
let ctx = common_meta::cache_invalidator::Context::default();
let _ = self
.cache_invalidator
.invalidate(&ctx, &[CacheIdent::TableId(table_id)])
.await;
Ok(())
}
}
@@ -497,6 +493,7 @@ mod tests {
use std::sync::Arc;
use common_meta::distributed_time_constants::REGION_LEASE_SECS;
use common_meta::instruction::Instruction;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::rpc::router::{Region, RegionRoute};

View File

@@ -48,8 +48,10 @@ use super::manager::RegionMigrationProcedureTracker;
use super::migration_abort::RegionMigrationAbort;
use super::upgrade_candidate_region::UpgradeCandidateRegion;
use super::{Context, ContextFactory, DefaultContextFactory, State, VolatileContext};
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::error::{self, Error, Result};
use crate::handler::{HeartbeatMailbox, Pusher, Pushers};
use crate::metasrv::MetasrvInfo;
use crate::procedure::region_migration::downgrade_leader_region::DowngradeLeaderRegion;
use crate::procedure::region_migration::migration_end::RegionMigrationEnd;
use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
@@ -152,6 +154,12 @@ impl TestingEnv {
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.to_string(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
self.mailbox_ctx.mailbox.clone(),
MetasrvInfo {
server_addr: self.server_addr.to_string(),
},
)),
}
}