diff --git a/src/meta-srv/src/election/rds/mysql.rs b/src/meta-srv/src/election/rds/mysql.rs index a15d1bac51..78832e3e11 100644 --- a/src/meta-srv/src/election/rds/mysql.rs +++ b/src/meta-srv/src/election/rds/mysql.rs @@ -1651,6 +1651,41 @@ mod tests { drop_table(&leader_mysql_election.client, table_name).await; } + #[tokio::test] + async fn test_reset_campaign() { + maybe_skip_mysql_integration_test!(); + common_telemetry::init_default_ut_logging(); + let leader_value = "test_leader".to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_reset_campaign_greptime_metakv"; + let candidate_lease_ttl = Duration::from_secs(5); + let meta_lease_ttl = Duration::from_secs(2); + let execution_timeout = Duration::from_secs(10); + let idle_session_timeout = Duration::from_secs(0); + let client = create_mysql_client(Some(table_name), execution_timeout, idle_session_timeout) + .await + .unwrap(); + + let (tx, _) = broadcast::channel(100); + let leader_mysql_election = MySqlElection { + leader_value, + client, + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(table_name).build(), + }; + leader_mysql_election + .is_leader + .store(true, Ordering::Relaxed); + leader_mysql_election.reset_campaign().await; + assert!(!leader_mysql_election.is_leader()); + drop_table(&leader_mysql_election.client, table_name).await; + } + #[tokio::test] async fn test_follower_action() { maybe_skip_mysql_integration_test!(); diff --git a/src/meta-srv/src/election/rds/postgres.rs b/src/meta-srv/src/election/rds/postgres.rs index 5d8a8bf2fa..77bcd30dfe 100644 --- a/src/meta-srv/src/election/rds/postgres.rs +++ b/src/meta-srv/src/election/rds/postgres.rs @@ -1582,6 +1582,44 @@ mod tests { drop_table(&follower_pg_election, table_name).await; } + #[tokio::test] + async fn test_reset_campaign() { + maybe_skip_postgres_integration_test!(); + let leader_value = "test_leader".to_string(); + let uuid = uuid::Uuid::new_v4().to_string(); + let table_name = "test_reset_campaign_greptime_metakv"; + let candidate_lease_ttl = Duration::from_secs(5); + let execution_timeout = Duration::from_secs(10); + let statement_timeout = Duration::from_secs(10); + let meta_lease_ttl = Duration::from_secs(2); + let idle_session_timeout = Duration::from_secs(0); + let client = create_postgres_client( + Some(table_name), + execution_timeout, + idle_session_timeout, + statement_timeout, + ) + .await + .unwrap(); + + let (tx, _) = broadcast::channel(100); + let leader_pg_election = PgElection { + leader_value, + pg_client: RwLock::new(client), + is_leader: AtomicBool::new(false), + leader_infancy: AtomicBool::new(true), + leader_watcher: tx, + store_key_prefix: uuid, + candidate_lease_ttl, + meta_lease_ttl, + sql_set: ElectionSqlFactory::new(28321, None, table_name).build(), + }; + leader_pg_election.is_leader.store(true, Ordering::Relaxed); + leader_pg_election.reset_campaign().await; + assert!(!leader_pg_election.is_leader()); + drop_table(&leader_pg_election, table_name).await; + } + #[tokio::test] async fn test_idle_session_timeout() { maybe_skip_postgres_integration_test!(); diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 680af35a06..4483b0ecdd 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -129,18 +129,27 @@ impl HeartbeatHandler for RegionLeaseHandler { #[cfg(test)] mod test { + use std::any::Any; use std::collections::{HashMap, HashSet}; use std::sync::Arc; use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat}; use common_meta::distributed_time_constants; + use common_meta::error::Result as MetaResult; use common_meta::key::TableMetadataManager; use common_meta::key::table_route::TableRouteValue; use common_meta::key::test_utils::new_test_table_info; use common_meta::kv_backend::memory::MemoryKvBackend; + use common_meta::kv_backend::txn::{Txn, TxnResponse}; + use common_meta::kv_backend::{KvBackend, TxnService}; use common_meta::peer::Peer; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::rpc::router::{LeaderState, Region, RegionRoute}; + use common_meta::rpc::store::{ + BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, + BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, + PutResponse, RangeRequest, RangeResponse, + }; use store_api::region_engine::RegionRole; use store_api::storage::RegionId; @@ -415,4 +424,102 @@ mod test { assert_eq!(granted, expected); } + + struct MockKvBackend; + + #[async_trait::async_trait] + impl TxnService for MockKvBackend { + type Error = common_meta::error::Error; + + async fn txn(&self, _txn: Txn) -> MetaResult { + unimplemented!() + } + + fn max_txn_ops(&self) -> usize { + unimplemented!() + } + } + + #[async_trait::async_trait] + impl KvBackend for MockKvBackend { + fn name(&self) -> &str { + "mock_kv_backend" + } + + fn as_any(&self) -> &dyn Any { + self + } + + async fn range(&self, _req: RangeRequest) -> MetaResult { + unimplemented!() + } + + async fn put(&self, _req: PutRequest) -> MetaResult { + unimplemented!() + } + + async fn batch_put(&self, _req: BatchPutRequest) -> MetaResult { + unimplemented!() + } + + async fn batch_get(&self, _req: BatchGetRequest) -> MetaResult { + common_meta::error::UnexpectedSnafu { + err_msg: "mock err", + } + .fail() + } + + async fn delete_range(&self, _req: DeleteRangeRequest) -> MetaResult { + unimplemented!() + } + + async fn batch_delete(&self, _req: BatchDeleteRequest) -> MetaResult { + unimplemented!() + } + } + + #[tokio::test] + async fn test_handle_renew_region_lease_failure() { + common_telemetry::init_default_ut_logging(); + let kvbackend = Arc::new(MockKvBackend); + let table_metadata_manager = Arc::new(TableMetadataManager::new(kvbackend)); + + let datanode_id = 1; + let region_number = 1u32; + let table_id = 10; + let region_id = RegionId::new(table_id, region_number); + let another_region_id = RegionId::new(table_id, region_number + 1); + let no_exist_region_id = RegionId::new(table_id, region_number + 2); + let peer = Peer::empty(datanode_id); + + let builder = MetasrvBuilder::new(); + let metasrv = builder.build().await.unwrap(); + let ctx = &mut metasrv.new_ctx(); + + let req = HeartbeatRequest { + duration_since_epoch: 1234, + ..Default::default() + }; + + let acc = &mut HeartbeatAccumulator::default(); + acc.stat = Some(Stat { + id: peer.id, + region_stats: vec![ + new_empty_region_stat(region_id, RegionRole::Leader), + new_empty_region_stat(another_region_id, RegionRole::Leader), + new_empty_region_stat(no_exist_region_id, RegionRole::Leader), + ], + ..Default::default() + }); + let handler = RegionLeaseHandler::new( + distributed_time_constants::REGION_LEASE_SECS, + table_metadata_manager.clone(), + Default::default(), + None, + ); + handler.handle(&req, ctx, acc).await.unwrap(); + + assert!(acc.region_lease.is_none()); + assert!(acc.inactive_region_ids.is_empty()); + } } diff --git a/src/meta-srv/src/region/supervisor.rs b/src/meta-srv/src/region/supervisor.rs index 97acaf4b07..132aa8a7c5 100644 --- a/src/meta-srv/src/region/supervisor.rs +++ b/src/meta-srv/src/region/supervisor.rs @@ -131,7 +131,7 @@ pub struct RegionSupervisorTicker { tick_handle: Mutex>>, /// The [`Option`] wrapper allows us to abort the job while dropping the [`RegionSupervisor`]. - initialization_handler: Mutex>>, + initialization_handle: Mutex>>, /// The interval of tick. tick_interval: Duration, @@ -176,7 +176,7 @@ impl RegionSupervisorTicker { ); Self { tick_handle: Mutex::new(None), - initialization_handler: Mutex::new(None), + initialization_handle: Mutex::new(None), tick_interval, initialization_delay, initialization_retry_period, @@ -213,7 +213,7 @@ impl RegionSupervisorTicker { } } }); - *self.initialization_handler.lock().unwrap() = Some(initialization_handler); + *self.initialization_handle.lock().unwrap() = Some(initialization_handler); let sender = self.sender.clone(); let ticker_loop = tokio::spawn(async move { @@ -243,7 +243,7 @@ impl RegionSupervisorTicker { handle.abort(); info!("The tick loop is stopped."); } - let initialization_handler = self.initialization_handler.lock().unwrap().take(); + let initialization_handler = self.initialization_handle.lock().unwrap().take(); if let Some(initialization_handler) = initialization_handler { initialization_handler.abort(); info!("The initialization loop is stopped."); @@ -929,7 +929,7 @@ pub(crate) mod tests { let (tx, mut rx) = tokio::sync::mpsc::channel(128); let ticker = RegionSupervisorTicker { tick_handle: Mutex::new(None), - initialization_handler: Mutex::new(None), + initialization_handle: Mutex::new(None), tick_interval: Duration::from_millis(10), initialization_delay: Duration::from_millis(100), initialization_retry_period: Duration::from_millis(100), @@ -947,6 +947,8 @@ pub(crate) mod tests { Event::Tick | Event::Clear | Event::InitializeAllRegions(_) ); } + assert!(ticker.initialization_handle.lock().unwrap().is_none()); + assert!(ticker.tick_handle.lock().unwrap().is_none()); } } @@ -956,7 +958,7 @@ pub(crate) mod tests { let (tx, mut rx) = tokio::sync::mpsc::channel(128); let ticker = RegionSupervisorTicker { tick_handle: Mutex::new(None), - initialization_handler: Mutex::new(None), + initialization_handle: Mutex::new(None), tick_interval: Duration::from_millis(1000), initialization_delay: Duration::from_millis(50), initialization_retry_period: Duration::from_millis(50),